diff --git a/src/main/java/com/google/api/gax/core/ConnectionSettings.java b/src/main/java/com/google/api/gax/core/ConnectionSettings.java index dfe1e9fa7..1098ffb17 100644 --- a/src/main/java/com/google/api/gax/core/ConnectionSettings.java +++ b/src/main/java/com/google/api/gax/core/ConnectionSettings.java @@ -56,25 +56,19 @@ @AutoValue public abstract class ConnectionSettings { - /* - * package-private so that the AutoValue derived class can access it - */ - interface CredentialsProvider { - Credentials getCredentials() throws IOException; - } - /** - * Gets the credentials which will be used to call the service. If the credentials - * have not been acquired yet, then they will be acquired when this function is called. + * Gets the credentials which will be used to call the service. If the credentials have not been + * acquired yet, then they will be acquired when this function is called. */ public Credentials getCredentials() throws IOException { return getCredentialsProvider().getCredentials(); } - /* - * package-private so that the AutoValue derived class can access it + /** + * The credentials to use in order to call the service. Credentials will not be acquired until + * they are required. */ - abstract CredentialsProvider getCredentialsProvider(); + public abstract CredentialsProvider getCredentialsProvider(); /** * The address used to reach the service. @@ -97,10 +91,10 @@ public Builder toBuilder() { @AutoValue.Builder public abstract static class Builder { - /* - * package-private so that the AutoValue derived class can access it + /** + * Set the credentials to use in order to call the service. */ - abstract Builder setCredentialsProvider(CredentialsProvider provider); + public abstract Builder setCredentialsProvider(CredentialsProvider provider); /** * Sets the credentials to use in order to call the service. diff --git a/src/main/java/com/google/api/gax/core/CredentialsProvider.java b/src/main/java/com/google/api/gax/core/CredentialsProvider.java new file mode 100644 index 000000000..0e19f9a72 --- /dev/null +++ b/src/main/java/com/google/api/gax/core/CredentialsProvider.java @@ -0,0 +1,16 @@ +package com.google.api.gax.core; + +import com.google.auth.Credentials; + +import java.io.IOException; + +/** + * Provides an interface to hold and acquire the credentials that will be used to call the service. + */ +public interface CredentialsProvider { + /** + * Gets the credentials which will be used to call the service. If the credentials have not been + * acquired yet, then they will be acquired when this function is called. + */ + Credentials getCredentials() throws IOException; +} diff --git a/src/main/java/com/google/api/gax/grpc/ApiCallSettingsTyped.java b/src/main/java/com/google/api/gax/grpc/ApiCallSettingsTyped.java index 16c0135c2..d889076f9 100644 --- a/src/main/java/com/google/api/gax/grpc/ApiCallSettingsTyped.java +++ b/src/main/java/com/google/api/gax/grpc/ApiCallSettingsTyped.java @@ -7,7 +7,6 @@ import io.grpc.MethodDescriptor; import io.grpc.Status; -import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; /** @@ -39,13 +38,11 @@ protected ApiCallSettingsTyped(ImmutableSet retryableCodes, } protected ApiCallable createBaseCallable( - ServiceApiSettings serviceSettings) throws IOException { + ManagedChannel channel, ScheduledExecutorService executor) { ClientCallFactory clientCallFactory = new DescriptorClientCallFactory<>(methodDescriptor); ApiCallable callable = new ApiCallable<>(new DirectCallable<>(clientCallFactory), this); - ManagedChannel channel = serviceSettings.getChannel(); - ScheduledExecutorService executor = serviceSettings.getExecutor(); if (getRetryableCodes() != null) { callable = callable.retryableOn(ImmutableSet.copyOf(getRetryableCodes())); } diff --git a/src/main/java/com/google/api/gax/grpc/ApiCallable.java b/src/main/java/com/google/api/gax/grpc/ApiCallable.java index 5cdab666b..ccbf65ab4 100644 --- a/src/main/java/com/google/api/gax/grpc/ApiCallable.java +++ b/src/main/java/com/google/api/gax/grpc/ApiCallable.java @@ -42,10 +42,10 @@ import com.google.common.util.concurrent.UncheckedExecutionException; import io.grpc.Channel; +import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.StatusRuntimeException; -import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; import javax.annotation.Nullable; @@ -103,69 +103,72 @@ public final class ApiCallable { private final ApiCallSettings settings; /** - * Create a callable object that represents a simple API method. - * Public only for technical reasons - for advanced usage + * Create a callable object that represents a simple API method. Public only for technical reasons + * - for advanced usage * - * @param simpleCallSettings {@link com.google.api.gax.grpc.SimpleCallSettings} to configure - * the method-level settings with. - * @param serviceSettings{@link com.google.api.gax.grpc.ServiceApiSettings} - * to configure the service-level settings with. + * @param simpleCallSettings {@link com.google.api.gax.grpc.SimpleCallSettings} to configure the + * method-level settings with. + * @param channel {@link ManagedChannel} to use to connect to the service. + * @param executor {@link ScheduledExecutorService} to use when connecting to the service. * @return {@link com.google.api.gax.grpc.ApiCallable} callable object. */ public static ApiCallable create( SimpleCallSettings simpleCallSettings, - ServiceApiSettings serviceSettings) throws IOException { - return simpleCallSettings.create(serviceSettings); + ManagedChannel channel, + ScheduledExecutorService executor) { + return simpleCallSettings.create(channel, executor); } /** - * Create a paged callable object that represents a page-streaming API method. - * Public only for technical reasons - for advanced usage + * Create a paged callable object that represents a page-streaming API method. Public only for + * technical reasons - for advanced usage * * @param pageStreamingCallSettings {@link com.google.api.gax.grpc.PageStreamingCallSettings} to * configure the page-streaming related settings with. - * @param serviceSettings{@link com.google.api.gax.grpc.ServiceApiSettings} - * to configure the service-level settings with. + * @param channel {@link ManagedChannel} to use to connect to the service. + * @param executor {@link ScheduledExecutorService} to use to when connecting to the service. * @return {@link com.google.api.gax.grpc.ApiCallable} callable object. */ public static ApiCallable> createPagedVariant( PageStreamingCallSettings pageStreamingCallSettings, - ServiceApiSettings serviceSettings) throws IOException { - return pageStreamingCallSettings.createPagedVariant(serviceSettings); + ManagedChannel channel, + ScheduledExecutorService executor) { + return pageStreamingCallSettings.createPagedVariant(channel, executor); } /** - * Create a base callable object that represents a page-streaming API method. - * Public only for technical reasons - for advanced usage + * Create a base callable object that represents a page-streaming API method. Public only for + * technical reasons - for advanced usage * * @param pageStreamingCallSettings {@link com.google.api.gax.grpc.PageStreamingCallSettings} to * configure the page-streaming related settings with. - * @param serviceSettings{@link com.google.api.gax.grpc.ServiceApiSettings} - * to configure the service-level settings with. + * @param channel {@link ManagedChannel} to use to connect to the service. + * @param executor {@link ScheduledExecutorService} to use to when connecting to the service. * @return {@link com.google.api.gax.grpc.ApiCallable} callable object. */ - public static - ApiCallable create( - PageStreamingCallSettings pageStreamingCallSettings, - ServiceApiSettings serviceSettings) throws IOException { - return pageStreamingCallSettings.create(serviceSettings); + public static ApiCallable create( + PageStreamingCallSettings pageStreamingCallSettings, + ManagedChannel channel, + ScheduledExecutorService executor) { + return pageStreamingCallSettings.create(channel, executor); } /** - * Create a callable object that represents a bundling API method. - * Public only for technical reasons - for advanced usage + * Create a callable object that represents a bundling API method. Public only for technical + * reasons - for advanced usage * - * @param bundlingCallSettings {@link com.google.api.gax.grpc.BundlingSettings} to configure - * the bundling related settings with. - * @param serviceSettings{@link com.google.api.gax.grpc.ServiceApiSettings} - * to configure the service-level settings with. + * @param bundlingCallSettings {@link com.google.api.gax.grpc.BundlingSettings} to configure the + * bundling related settings with. + * @param channel {@link ManagedChannel} to use to connect to the service. + * @param executor {@link ScheduledExecutorService} to use to when connecting to the service. * @return {@link com.google.api.gax.grpc.ApiCallable} callable object. */ public static ApiCallable create( BundlingCallSettings bundlingCallSettings, - ServiceApiSettings serviceSettings) throws IOException { - return bundlingCallSettings.create(serviceSettings); + ManagedChannel channel, + ScheduledExecutorService executor) { + return bundlingCallSettings.create(channel, executor); } /** diff --git a/src/main/java/com/google/api/gax/grpc/ApiException.java b/src/main/java/com/google/api/gax/grpc/ApiException.java index 1c586b9ac..2d7b4ff0b 100644 --- a/src/main/java/com/google/api/gax/grpc/ApiException.java +++ b/src/main/java/com/google/api/gax/grpc/ApiException.java @@ -38,7 +38,9 @@ /** * Represents an exception thrown during an RPC call. * - *

It stores information useful for functionalities in {@link ApiCallable}. + *

+ * It stores information useful for functionalities in {@link ApiCallable}. For more information + * about the status codes returned by the underlying grpc exception see {@link Status}. */ public class ApiException extends RuntimeException { private final Status.Code statusCode; @@ -58,9 +60,9 @@ public boolean isRetryable() { } /** - * Returns the status code of the underlying grpc exception. In cases - * where the underlying exception is not of type StatusException or - * StatusRuntimeException, the status code will be Status.Code.UNKNOWN + * Returns the status code of the underlying grpc exception. In cases where the underlying + * exception is not of type StatusException or StatusRuntimeException, the status code will be + * Status.Code.UNKNOWN. For more information about status codes see {@link Status}. */ public Status.Code getStatusCode() { return statusCode; diff --git a/src/main/java/com/google/api/gax/grpc/BundlingCallSettings.java b/src/main/java/com/google/api/gax/grpc/BundlingCallSettings.java index 2765cdfd7..c83df6b76 100644 --- a/src/main/java/com/google/api/gax/grpc/BundlingCallSettings.java +++ b/src/main/java/com/google/api/gax/grpc/BundlingCallSettings.java @@ -3,11 +3,12 @@ import com.google.api.gax.core.RetrySettings; import com.google.common.collect.ImmutableSet; +import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor; import io.grpc.Status; -import java.io.IOException; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; /** * A settings class to configure an ApiCallable for calls to an API method that supports @@ -23,8 +24,8 @@ public final class BundlingCallSettings * Package-private, for use by ApiCallable. */ ApiCallable create( - ServiceApiSettings serviceSettings) throws IOException { - ApiCallable baseCallable = createBaseCallable(serviceSettings); + ManagedChannel channel, ScheduledExecutorService executor) { + ApiCallable baseCallable = createBaseCallable(channel, executor); bundlerFactory = new BundlerFactory<>(bundlingDescriptor, bundlingSettings); return baseCallable.bundling(bundlingDescriptor, bundlerFactory); } diff --git a/src/main/java/com/google/api/gax/grpc/ChannelProvider.java b/src/main/java/com/google/api/gax/grpc/ChannelProvider.java new file mode 100644 index 000000000..6fec85763 --- /dev/null +++ b/src/main/java/com/google/api/gax/grpc/ChannelProvider.java @@ -0,0 +1,50 @@ +package com.google.api.gax.grpc; + +import com.google.api.gax.core.ConnectionSettings; + +import io.grpc.ManagedChannel; + +import java.io.IOException; +import java.util.concurrent.Executor; + +import javax.annotation.Nullable; + +/** + * Provides an interface to hold and build the channel that will be used. If the channel does not + * already exist, it will be constructed when {@link #getOrBuildChannel} is called. + * + * Implementations of {@link ChannelProvider} may choose to create a new {@link ManagedChannel} for + * each call to {@link #getOrBuildChannel}, or may return a fixed {@link ManagedChannel} instance. + * In cases where the same {@link ManagedChannel} instance is returned, for example by a + * {@link ChannelProvider} created using the {@link ServiceApiSettings} + * provideChannelWith(ManagedChannel, boolean) method, and shouldAutoClose returns true, the + * {@link #getOrBuildChannel} method will throw an {@link IllegalStateException} if it is called + * more than once. This is to prevent the same {@link ManagedChannel} being closed prematurely when + * it is used by multiple client objects. + */ +public interface ChannelProvider { + /** + * Connection settings used to build the channel. If a channel is provided directly this will be + * set to null. + */ + @Nullable + ConnectionSettings connectionSettings(); + + /** + * Indicates whether the channel should be closed by the containing API class. + */ + boolean shouldAutoClose(); + + /** + * Get the channel to be used to connect to the service. The first time this is called, if the + * channel does not already exist, it will be created. The {@link Executor} will only be used when + * the channel is created. For implementations returning a fixed {@link ManagedChannel} object, + * the executor is unused. + * + * If the {@link ChannelProvider} is configured to return a fixed {@link ManagedChannel} object + * and to return shouldAutoClose as true, then after the first call to {@link #getOrBuildChannel}, + * subsequent calls should throw an {@link IllegalStateException}. See interface level docs for + * {@link ChannelProvider} for more details. + */ + ManagedChannel getOrBuildChannel(Executor executor) throws IOException; +} diff --git a/src/main/java/com/google/api/gax/grpc/ExecutorProvider.java b/src/main/java/com/google/api/gax/grpc/ExecutorProvider.java new file mode 100644 index 000000000..61204d260 --- /dev/null +++ b/src/main/java/com/google/api/gax/grpc/ExecutorProvider.java @@ -0,0 +1,36 @@ +package com.google.api.gax.grpc; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * Provides an interface to hold and create the Executor to be used. If the executor does not + * already exist, it will be constructed when {@link #getOrBuildExecutor} is called. + * + * Implementations of ExecutorProvider may choose to create a new {@link ScheduledExecutorService} + * for each call to {@link #getOrBuildExecutor}, or may return a fixed + * {@link ScheduledExecutorService} instance. In cases where the same + * {@link ScheduledExecutorService} instance is returned, for example by an {@link ExecutorProvider} + * created using the {@link ServiceApiSettings} provideExecutorWith(ScheduledExecutorService, + * boolean) method, and shouldAutoClose returns true, the {@link #getOrBuildExecutor} method will + * throw an {@link IllegalStateException} if it is called more than once. This is to prevent the + * same {@link ScheduledExecutorService} being closed prematurely when it is used by multiple client + * objects. + */ +public interface ExecutorProvider { + /** + * Indicates whether the channel should be closed by the containing API class. + */ + boolean shouldAutoClose(); + + /** + * Get the executor to be used to connect to the service. The first time this is called, if the + * executor does not already exist, it will be created. + * + * If the {@link ExecutorProvider} is configured to return a fixed + * {@link ScheduledExecutorService} object and to return shouldAutoClose as true, then after the + * first call to {@link #getOrBuildExecutor}, subsequent calls should throw an + * {@link IllegalStateException}. See interface level docs for {@link ExecutorProvider} for more + * details. + */ + ScheduledExecutorService getOrBuildExecutor(); +} diff --git a/src/main/java/com/google/api/gax/grpc/PageStreamingCallSettings.java b/src/main/java/com/google/api/gax/grpc/PageStreamingCallSettings.java index 3cb1db69e..336cbf52d 100644 --- a/src/main/java/com/google/api/gax/grpc/PageStreamingCallSettings.java +++ b/src/main/java/com/google/api/gax/grpc/PageStreamingCallSettings.java @@ -4,11 +4,12 @@ import com.google.api.gax.core.RetrySettings; import com.google.common.collect.ImmutableSet; +import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor; import io.grpc.Status; -import java.io.IOException; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; /** @@ -23,16 +24,16 @@ public final class PageStreamingCallSettings * Package-private, for use by ApiCallable. */ ApiCallable create( - ServiceApiSettings serviceSettings) throws IOException { - return createBaseCallable(serviceSettings); + ManagedChannel channel, ScheduledExecutorService executor) { + return createBaseCallable(channel, executor); } /** * Package-private, for use by ApiCallable. */ ApiCallable> createPagedVariant( - ServiceApiSettings serviceSettings) throws IOException { - ApiCallable baseCallable = createBaseCallable(serviceSettings); + ManagedChannel channel, ScheduledExecutorService executor) { + ApiCallable baseCallable = createBaseCallable(channel, executor); return baseCallable.pageStreaming(pageDescriptor); } diff --git a/src/main/java/com/google/api/gax/grpc/ServiceApiSettings.java b/src/main/java/com/google/api/gax/grpc/ServiceApiSettings.java index ec6335b35..bebcf7250 100644 --- a/src/main/java/com/google/api/gax/grpc/ServiceApiSettings.java +++ b/src/main/java/com/google/api/gax/grpc/ServiceApiSettings.java @@ -19,8 +19,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import javax.annotation.Nullable; - /** * A base settings class to configure a service API class. * @@ -51,12 +49,8 @@ */ public abstract class ServiceApiSettings { - private final ManagedChannel channel; - private final boolean shouldAutoCloseChannel; - private final ScheduledExecutorService executor; - - @Nullable - private final ConnectionSettings connectionSettings; + private final ChannelProvider channelProvider; + private final ExecutorProvider executorProvider; private final String generatorName; private final String generatorVersion; @@ -66,34 +60,35 @@ public abstract class ServiceApiSettings { /** * Constructs an instance of ServiceApiSettings. */ - protected ServiceApiSettings(ManagedChannel channel, - boolean shouldAutoCloseChannel, - ScheduledExecutorService executor, - ConnectionSettings connectionSettings, - String generatorName, - String generatorVersion, - String clientLibName, - String clientLibVersion) { - this.channel = channel; - this.executor = executor; - this.connectionSettings = connectionSettings; - this.shouldAutoCloseChannel = shouldAutoCloseChannel; + protected ServiceApiSettings( + ChannelProvider channelProvider, + ExecutorProvider executorProvider, + String generatorName, + String generatorVersion, + String clientLibName, + String clientLibVersion) { + this.channelProvider = channelProvider; + this.executorProvider = executorProvider; this.clientLibName = clientLibName; this.clientLibVersion = clientLibVersion; this.generatorName = generatorName; this.generatorVersion = generatorVersion; } - public final ManagedChannel getChannel() { - return channel; - } - - public final ScheduledExecutorService getExecutor() { - return executor; + /** + * Return the channel provider. If no channel provider was set, the default channel provider will + * be returned. + */ + public final ChannelProvider getChannelProvider() { + return channelProvider; } - public final boolean shouldAutoCloseChannel() { - return shouldAutoCloseChannel; + /** + * Return the executor provider. It no executor provider was set, the default executor provider + * will be returned. + */ + public final ExecutorProvider getExecutorProvider() { + return executorProvider; } public abstract static class Builder { @@ -114,16 +109,6 @@ public abstract static class Builder { private ChannelProvider channelProvider; private ExecutorProvider executorProvider; - private interface ChannelProvider { - ConnectionSettings connectionSettings(); - boolean shouldAutoClose(); - ManagedChannel getChannel(Executor executor) throws IOException; - } - - private interface ExecutorProvider { - ScheduledExecutorService getExecutor(); - } - protected Builder(ConnectionSettings connectionSettings) { this(); channelProvider = createChannelProvider(connectionSettings); @@ -134,11 +119,8 @@ protected Builder(ConnectionSettings connectionSettings) { */ protected Builder(ServiceApiSettings settings) { this(); - if (settings.connectionSettings != null) { - channelProvider = createChannelProvider(settings.connectionSettings); - } else { - channelProvider = createChannelProvider(settings.channel, settings.shouldAutoCloseChannel); - } + this.channelProvider = settings.channelProvider; + this.executorProvider = settings.executorProvider; this.clientLibName = settings.clientLibName; this.clientLibVersion = settings.clientLibVersion; this.serviceGeneratorName = settings.generatorName; @@ -151,44 +133,68 @@ private Builder() { serviceGeneratorName = DEFAULT_GENERATOR_NAME; serviceGeneratorVersion = DEFAULT_VERSION; - executorProvider = new ExecutorProvider() { - private ScheduledExecutorService executor = null; - @Override - public ScheduledExecutorService getExecutor() { - if (executor != null) { - return executor; - } - executor = MoreExecutors.getExitingScheduledExecutorService( - new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREADS)); - return executor; - } - }; + executorProvider = + new ExecutorProvider() { + @Override + public ScheduledExecutorService getOrBuildExecutor() { + return MoreExecutors.getExitingScheduledExecutorService( + new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREADS)); + } + + @Override + public boolean shouldAutoClose() { + return true; + } + }; } - /** * Sets the executor to use for channels, retries, and bundling. * - * It is up to the user to terminate the {@code Executor} when it is no longer needed. + * If multiple Api objects will use this executor, shouldAutoClose must be set to false to + * prevent the {@link ExecutorProvider} from throwing an {@link IllegalStateException}. See + * {@link ExecutorProvider} for more details. */ - public Builder setExecutor(final ScheduledExecutorService executor) { - executorProvider = new ExecutorProvider() { - @Override - public ScheduledExecutorService getExecutor() { - return executor; - } - }; + public Builder provideExecutorWith( + final ScheduledExecutorService executor, final boolean shouldAutoClose) { + executorProvider = + new ExecutorProvider() { + private volatile boolean executorProvided = false; + + @Override + public ScheduledExecutorService getOrBuildExecutor() { + if (executorProvided) { + if (shouldAutoClose) { + throw new IllegalStateException( + "A fixed executor cannot be re-used when shouldAutoClose is set to true. " + + "Try calling provideExecutorWith with shouldAutoClose set to false " + + "or using the default executor."); + } + } else { + executorProvided = true; + } + return executor; + } + + @Override + public boolean shouldAutoClose() { + return shouldAutoClose; + } + }; return this; } /** - * Sets a channel for this ServiceApiSettings to use. This prevents a channel - * from being created. + * Sets a channel for this ServiceApiSettings to use. This prevents a channel from being + * created. * * See class documentation for more details on channels. + * + * If multiple Api objects will use this channel, shouldAutoClose must be set to false to + * prevent the {@link ChannelProvider} from throwing an {@link IllegalStateException}. See + * {@link ChannelProvider} for more details. */ - public Builder provideChannelWith( - final ManagedChannel channel, final boolean shouldAutoClose) { + public Builder provideChannelWith(final ManagedChannel channel, final boolean shouldAutoClose) { channelProvider = createChannelProvider(channel, shouldAutoClose); return this; } @@ -202,26 +208,6 @@ public Builder provideChannelWith( return this; } - /** - * The channel used to send requests to the service. - * - * If no channel was set, a default channel will be instantiated, using - * the connection settings provided. - * - * See class documentation for more details on channels. - */ - public ManagedChannel getOrBuildChannel() throws IOException { - return channelProvider.getChannel(this.getOrBuildExecutor()); - } - - /** - * The Executor used for channels, retries, and bundling.. - * If no executor was set, a default executor will be instantiated. - */ - public ScheduledExecutorService getOrBuildExecutor() { - return executorProvider.getExecutor(); - } - /** * Sets the generator name and version for the GRPC custom header. */ @@ -240,6 +226,14 @@ public Builder setClientLibHeader(String name, String version) { return this; } + public ChannelProvider getChannelProvider() { + return channelProvider; + } + + public ExecutorProvider getExecutorProvider() { + return executorProvider; + } + public String getClientLibName() { return clientLibName; } @@ -256,14 +250,6 @@ public String getGeneratorVersion() { return serviceGeneratorVersion; } - public ConnectionSettings getConnectionSettings() { - return channelProvider.connectionSettings(); - } - - public boolean shouldAutoCloseChannel() { - return channelProvider.shouldAutoClose(); - } - /** * Performs a merge, using only non-null fields */ @@ -288,22 +274,17 @@ protected Builder applyToAllApiMethods( private ChannelProvider createChannelProvider(final ConnectionSettings settings) { return new ChannelProvider() { - private ManagedChannel channel = null; @Override - public ManagedChannel getChannel(Executor executor) throws IOException { - if (channel != null) { - return channel; - } - + public ManagedChannel getOrBuildChannel(Executor executor) throws IOException { List interceptors = Lists.newArrayList(); interceptors.add(new ClientAuthInterceptor(settings.getCredentials(), executor)); interceptors.add(new HeaderInterceptor(serviceHeader())); - channel = NettyChannelBuilder.forAddress(settings.getServiceAddress(), settings.getPort()) + return NettyChannelBuilder.forAddress(settings.getServiceAddress(), settings.getPort()) .negotiationType(NegotiationType.TLS) .intercept(interceptors) + .executor(executor) .build(); - return channel; } @Override @@ -323,9 +304,14 @@ private String serviceHeader() { gaxVersion = DEFAULT_VERSION; } String javaVersion = Runtime.class.getPackage().getImplementationVersion(); - return String.format("%s/%s;%s/%s;gax/%s;java/%s", - clientLibName, clientLibVersion, serviceGeneratorName, serviceGeneratorVersion, - gaxVersion, javaVersion); + return String.format( + "%s/%s;%s/%s;gax/%s;java/%s", + clientLibName, + clientLibVersion, + serviceGeneratorName, + serviceGeneratorVersion, + gaxVersion, + javaVersion); } }; } @@ -333,14 +319,28 @@ private String serviceHeader() { private ChannelProvider createChannelProvider(final ManagedChannel channel, final boolean shouldAutoClose) { return new ChannelProvider() { + private boolean channelProvided = false; + @Override - public ManagedChannel getChannel(Executor executor) { + public ManagedChannel getOrBuildChannel(Executor executor) { + if (channelProvided) { + if (shouldAutoClose) { + throw new IllegalStateException( + "A fixed channel cannot be re-used when shouldAutoClose is set to true. " + + "Try calling provideChannelWith with shouldAutoClose set to false, or " + + "using a channel created from a ConnectionSettings object."); + } + } else { + channelProvided = true; + } return channel; } + @Override public boolean shouldAutoClose() { return shouldAutoClose; } + @Override public ConnectionSettings connectionSettings() { return null; diff --git a/src/main/java/com/google/api/gax/grpc/SimpleCallSettings.java b/src/main/java/com/google/api/gax/grpc/SimpleCallSettings.java index d26e0dd18..55de84bae 100644 --- a/src/main/java/com/google/api/gax/grpc/SimpleCallSettings.java +++ b/src/main/java/com/google/api/gax/grpc/SimpleCallSettings.java @@ -3,11 +3,12 @@ import com.google.api.gax.core.RetrySettings; import com.google.common.collect.ImmutableSet; +import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor; import io.grpc.Status; -import java.io.IOException; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; /** * A settings class to configure an ApiCallable for calls to a simple API method (i.e. that @@ -20,8 +21,8 @@ public final class SimpleCallSettings * Package-private, for use by ApiCallable. */ ApiCallable create( - ServiceApiSettings serviceSettings) throws IOException { - return createBaseCallable(serviceSettings); + ManagedChannel channel, ScheduledExecutorService executor) { + return createBaseCallable(channel, executor); } private SimpleCallSettings(ImmutableSet retryableCodes, diff --git a/src/test/java/com/google/api/gax/grpc/ServiceApiSettingsTest.java b/src/test/java/com/google/api/gax/grpc/ServiceApiSettingsTest.java new file mode 100644 index 000000000..73ff623a9 --- /dev/null +++ b/src/test/java/com/google/api/gax/grpc/ServiceApiSettingsTest.java @@ -0,0 +1,153 @@ +package com.google.api.gax.grpc; + +import com.google.api.gax.core.ConnectionSettings; +import com.google.common.collect.ImmutableList; +import com.google.common.truth.Truth; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +import io.grpc.ManagedChannel; + +import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; + +/** + * Tests for {@link ServiceApiSettings}. + */ +@RunWith(JUnit4.class) +public class ServiceApiSettingsTest { + + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static class FakeSettings extends ServiceApiSettings { + + public static final String DEFAULT_SERVICE_ADDRESS = "pubsub-experimental.googleapis.com"; + public static final int DEFAULT_SERVICE_PORT = 443; + public static final ImmutableList DEFAULT_SERVICE_SCOPES = + ImmutableList.builder() + .add("https://www.googleapis.com/auth/pubsub") + .add("https://www.googleapis.com/auth/cloud-platform") + .build(); + public static final ConnectionSettings DEFAULT_CONNECTION_SETTINGS = + ConnectionSettings.newBuilder() + .setServiceAddress(DEFAULT_SERVICE_ADDRESS) + .setPort(DEFAULT_SERVICE_PORT) + .provideCredentialsWith(DEFAULT_SERVICE_SCOPES) + .build(); + + public static Builder createBuilder(ConnectionSettings connectionSettings) { + return new Builder(connectionSettings); + } + + private FakeSettings(Builder settingsBuilder) throws IOException { + super( + settingsBuilder.getChannelProvider(), + settingsBuilder.getExecutorProvider(), + settingsBuilder.getGeneratorName(), + settingsBuilder.getGeneratorVersion(), + settingsBuilder.getClientLibName(), + settingsBuilder.getClientLibVersion()); + } + + private static class Builder extends ServiceApiSettings.Builder { + + private Builder(ConnectionSettings connectionSettings) { + super(connectionSettings); + } + + @Override + public FakeSettings build() throws IOException { + return new FakeSettings(this); + } + + @Override + public Builder provideExecutorWith( + final ScheduledExecutorService executor, boolean shouldAutoClose) { + super.provideExecutorWith(executor, shouldAutoClose); + return this; + } + + @Override + public Builder provideChannelWith(ManagedChannel channel, boolean shouldAutoClose) { + super.provideChannelWith(channel, shouldAutoClose); + return this; + } + + @Override + public Builder provideChannelWith(ConnectionSettings settings) { + super.provideChannelWith(settings); + return this; + } + } + } + + @Test + public void fixedChannelAutoClose() throws IOException { + thrown.expect(IllegalStateException.class); + ManagedChannel channel = Mockito.mock(ManagedChannel.class); + FakeSettings settings = + FakeSettings.createBuilder(FakeSettings.DEFAULT_CONNECTION_SETTINGS) + .provideChannelWith(channel, true) + .build(); + ChannelProvider channelProvider = settings.getChannelProvider(); + ScheduledExecutorService executor = settings.getExecutorProvider().getOrBuildExecutor(); + ManagedChannel channelA = channelProvider.getOrBuildChannel(executor); + ManagedChannel channelB = channelProvider.getOrBuildChannel(executor); + } + + @Test + public void fixedChannelNoAutoClose() throws IOException { + ManagedChannel channel = Mockito.mock(ManagedChannel.class); + FakeSettings settings = + FakeSettings.createBuilder(FakeSettings.DEFAULT_CONNECTION_SETTINGS) + .provideChannelWith(channel, false) + .build(); + ChannelProvider channelProvider = settings.getChannelProvider(); + ScheduledExecutorService executor = settings.getExecutorProvider().getOrBuildExecutor(); + ManagedChannel channelA = channelProvider.getOrBuildChannel(executor); + ManagedChannel channelB = channelProvider.getOrBuildChannel(executor); + Truth.assertThat(channelA).isEqualTo(channelB); + } + + @Test + public void defaultExecutor() throws IOException { + FakeSettings settings = + FakeSettings.createBuilder(FakeSettings.DEFAULT_CONNECTION_SETTINGS).build(); + ExecutorProvider executorProvider = settings.getExecutorProvider(); + ScheduledExecutorService executorA = executorProvider.getOrBuildExecutor(); + ScheduledExecutorService executorB = executorProvider.getOrBuildExecutor(); + Truth.assertThat(executorA).isNotEqualTo(executorB); + } + + @Test + public void fixedExecutorAutoClose() throws IOException { + thrown.expect(IllegalStateException.class); + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + FakeSettings settings = + FakeSettings.createBuilder(FakeSettings.DEFAULT_CONNECTION_SETTINGS) + .provideExecutorWith(executor, true) + .build(); + ExecutorProvider executorProvider = settings.getExecutorProvider(); + ScheduledExecutorService executorA = executorProvider.getOrBuildExecutor(); + ScheduledExecutorService executorB = executorProvider.getOrBuildExecutor(); + } + + @Test + public void fixedExecutorNoAutoClose() throws IOException { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + FakeSettings settings = + FakeSettings.createBuilder(FakeSettings.DEFAULT_CONNECTION_SETTINGS) + .provideExecutorWith(executor, false) + .build(); + ExecutorProvider executorProvider = settings.getExecutorProvider(); + ScheduledExecutorService executorA = executorProvider.getOrBuildExecutor(); + ScheduledExecutorService executorB = executorProvider.getOrBuildExecutor(); + Truth.assertThat(executorA).isEqualTo(executorB); + } +} +