Skip to content

Commit

Permalink
Add client configuration overriding of SCHEDULED_EXECUTOR_SERVICE opt…
Browse files Browse the repository at this point in the history
…ion (#4002)

* Add client configuration overriding of SCHEDULED_EXECUTOR_SERVICE option

* review

* review
  • Loading branch information
scrocquesel authored May 19, 2023
1 parent 637de29 commit c7a1786
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 5 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-c5f66b2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "scrocquesel",
"description": "Add client configuration overriding of SCHEDULED_EXECUTOR_SERVICE option"
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import software.amazon.awssdk.profiles.ProfileFileSystemSetting;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.Either;
import software.amazon.awssdk.utils.ScheduledExecutorUtils;
import software.amazon.awssdk.utils.ThreadFactoryBuilder;
import software.amazon.awssdk.utils.Validate;

Expand Down Expand Up @@ -222,6 +223,7 @@ private SdkClientConfiguration setOverrides(SdkClientConfiguration configuration

SdkClientConfiguration.Builder builder = configuration.toBuilder();

builder.option(SCHEDULED_EXECUTOR_SERVICE, clientOverrideConfiguration.scheduledExecutorService().orElse(null));
builder.option(EXECUTION_INTERCEPTORS, clientOverrideConfiguration.executionInterceptors());
builder.option(RETRY_POLICY, clientOverrideConfiguration.retryPolicy().orElse(null));
builder.option(ADDITIONAL_HTTP_HEADERS, clientOverrideConfiguration.headers());
Expand Down Expand Up @@ -313,7 +315,7 @@ private SdkClientConfiguration finalizeAsyncConfiguration(SdkClientConfiguration
private SdkClientConfiguration finalizeConfiguration(SdkClientConfiguration config) {
RetryPolicy retryPolicy = resolveRetryPolicy(config);
return config.toBuilder()
.option(SCHEDULED_EXECUTOR_SERVICE, resolveScheduledExecutorService())
.option(SCHEDULED_EXECUTOR_SERVICE, resolveScheduledExecutorService(config))
.option(EXECUTION_INTERCEPTORS, resolveExecutionInterceptors(config))
.option(RETRY_POLICY, retryPolicy)
.option(CLIENT_USER_AGENT, resolveClientUserAgent(config, retryPolicy))
Expand Down Expand Up @@ -410,9 +412,17 @@ private Executor resolveAsyncFutureCompletionExecutor(SdkClientConfiguration con
* Finalize the internal SDK scheduled executor service that is used for scheduling tasks such
* as async retry attempts and timeout task.
*/
private ScheduledExecutorService resolveScheduledExecutorService() {
return Executors.newScheduledThreadPool(5, new ThreadFactoryBuilder()
.threadNamePrefix("sdk-ScheduledExecutor").build());
private ScheduledExecutorService resolveScheduledExecutorService(SdkClientConfiguration config) {
Supplier<ScheduledExecutorService> defaultScheduledExecutor = () -> {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5, new ThreadFactoryBuilder()
.threadNamePrefix("sdk-ScheduledExecutor").build());

return executor;
};

return Optional.ofNullable(config.option(SCHEDULED_EXECUTOR_SERVICE))
.map(ScheduledExecutorUtils::unmanagedScheduledExecutor)
.orElseGet(defaultScheduledExecutor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.annotations.ToBuilderIgnoreField;
Expand Down Expand Up @@ -62,6 +63,7 @@ public final class ClientOverrideConfiguration
private final String defaultProfileName;
private final List<MetricPublisher> metricPublishers;
private final ExecutionAttributes executionAttributes;
private final ScheduledExecutorService scheduledExecutorService;

/**
* Initialize this configuration. Private to require use of {@link #builder()}.
Expand All @@ -77,6 +79,7 @@ private ClientOverrideConfiguration(Builder builder) {
this.defaultProfileName = builder.defaultProfileName();
this.metricPublishers = Collections.unmodifiableList(new ArrayList<>(builder.metricPublishers()));
this.executionAttributes = ExecutionAttributes.unmodifiableExecutionAttributes(builder.executionAttributes());
this.scheduledExecutorService = builder.scheduledExecutorService();
}

@Override
Expand All @@ -92,7 +95,8 @@ public Builder toBuilder() {
.defaultProfileFile(defaultProfileFile)
.defaultProfileName(defaultProfileName)
.executionAttributes(executionAttributes)
.metricPublishers(metricPublishers);
.metricPublishers(metricPublishers)
.scheduledExecutorService(scheduledExecutorService);
}

/**
Expand Down Expand Up @@ -141,6 +145,17 @@ public List<ExecutionInterceptor> executionInterceptors() {
return executionInterceptors;
}

/**
* The optional scheduled executor service that should be used for scheduling tasks such as async retry attempts
* and timeout task.
* <p>
* <b>The SDK will not automatically close the executor when the client is closed. It is the responsibility of the
* user to manually close the executor once all clients utilizing it have been closed.</b>
*/
public Optional<ScheduledExecutorService> scheduledExecutorService() {
return Optional.ofNullable(scheduledExecutorService);
}

/**
* The amount of time to allow the client to complete the execution of an API call. This timeout covers the entire client
* execution except for marshalling. This includes request handler execution, all HTTP requests including retries,
Expand Down Expand Up @@ -226,6 +241,7 @@ public String toString() {
.add("advancedOptions", advancedOptions)
.add("profileFile", defaultProfileFile)
.add("profileName", defaultProfileName)
.add("scheduledExecutorService", scheduledExecutorService)
.build();
}

Expand Down Expand Up @@ -338,6 +354,20 @@ default Builder retryPolicy(RetryMode retryMode) {

List<ExecutionInterceptor> executionInterceptors();

/**
* Configure the scheduled executor service that should be used for scheduling tasks such as async retry attempts
* and timeout task.
*
* <p>
* <b>The SDK will not automatically close the executor when the client is closed. It is the responsibility of the
* user to manually close the executor once all clients utilizing it have been closed.</b>
*
* @see ClientOverrideConfiguration#scheduledExecutorService()
*/
Builder scheduledExecutorService(ScheduledExecutorService scheduledExecutorService);

ScheduledExecutorService scheduledExecutorService();

/**
* Configure an advanced override option. These values are used very rarely, and the majority of SDK customers can ignore
* them.
Expand Down Expand Up @@ -499,6 +529,7 @@ private static final class DefaultClientOverrideConfigurationBuilder implements
private String defaultProfileName;
private List<MetricPublisher> metricPublishers = new ArrayList<>();
private ExecutionAttributes.Builder executionAttributes = ExecutionAttributes.builder();
private ScheduledExecutorService scheduledExecutorService;

@Override
public Builder headers(Map<String, List<String>> headers) {
Expand Down Expand Up @@ -561,6 +592,18 @@ public List<ExecutionInterceptor> executionInterceptors() {
return Collections.unmodifiableList(executionInterceptors);
}

@Override
public ScheduledExecutorService scheduledExecutorService()
{
return scheduledExecutorService;
}

@Override
public Builder scheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = scheduledExecutorService;
return this;
}

@Override
public <T> Builder putAdvancedOption(SdkAdvancedClientOption<T> option, T value) {
this.advancedOptions.put(option, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static software.amazon.awssdk.core.client.config.SdkClientOption.PROFILE_FILE_SUPPLIER;
import static software.amazon.awssdk.core.client.config.SdkClientOption.PROFILE_NAME;
import static software.amazon.awssdk.core.client.config.SdkClientOption.RETRY_POLICY;
import static software.amazon.awssdk.core.client.config.SdkClientOption.SCHEDULED_EXECUTOR_SERVICE;
import static software.amazon.awssdk.core.internal.SdkInternalTestAdvancedClientOption.ENDPOINT_OVERRIDDEN_OVERRIDE;

import java.beans.BeanInfo;
Expand All @@ -52,6 +53,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import org.assertj.core.api.Assertions;
import org.junit.Before;
Expand All @@ -76,6 +79,7 @@
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.profiles.ProfileFile;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.ScheduledExecutorUtils.UnmanagedScheduledExecutorService;
import software.amazon.awssdk.utils.StringInputStream;

/**
Expand Down Expand Up @@ -132,6 +136,7 @@ public void overrideConfigurationReturnsSetValues() {
.type(ProfileFile.Type.CONFIGURATION)
.build();
String profileName = "name";
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder()
.executionInterceptors(interceptors)
Expand All @@ -148,6 +153,7 @@ public void overrideConfigurationReturnsSetValues() {
.metricPublishers(metricPublishers)
.executionAttributes(executionAttributes)
.putAdvancedOption(ENDPOINT_OVERRIDDEN_OVERRIDE, Boolean.TRUE)
.scheduledExecutorService(scheduledExecutorService)
.build();

TestClientBuilder builder = testClientBuilder().overrideConfiguration(overrideConfig);
Expand All @@ -166,6 +172,7 @@ public void overrideConfigurationReturnsSetValues() {
assertThat(builderOverrideConfig.metricPublishers()).isEqualTo(metricPublishers);
assertThat(builderOverrideConfig.executionAttributes().getAttributes()).isEqualTo(executionAttributes.getAttributes());
assertThat(builderOverrideConfig.advancedOption(ENDPOINT_OVERRIDDEN_OVERRIDE)).isEqualTo(Optional.of(Boolean.TRUE));
assertThat(builderOverrideConfig.scheduledExecutorService().get()).isEqualTo(scheduledExecutorService);
}

@Test
Expand All @@ -189,6 +196,7 @@ public void overrideConfigurationOmitsUnsetValues() {
assertThat(builderOverrideConfig.metricPublishers()).isEmpty();
assertThat(builderOverrideConfig.executionAttributes().getAttributes()).isEmpty();
assertThat(builderOverrideConfig.advancedOption(ENDPOINT_OVERRIDDEN_OVERRIDE)).isEmpty();
assertThat(builderOverrideConfig.scheduledExecutorService()).isEmpty();
}

@Test
Expand All @@ -198,6 +206,7 @@ public void buildIncludesClientOverrides() {
interceptors.add(interceptor);

RetryPolicy retryPolicy = RetryPolicy.builder().build();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

Map<String, List<String>> headers = new HashMap<>();
List<String> headerValues = new ArrayList<>();
Expand Down Expand Up @@ -247,6 +256,7 @@ public void close() {
.metricPublishers(metricPublishers)
.executionAttributes(executionAttributes)
.putAdvancedOption(ENDPOINT_OVERRIDDEN_OVERRIDE, Boolean.TRUE)
.scheduledExecutorService(scheduledExecutorService)
.build();

SdkClientConfiguration config =
Expand All @@ -267,6 +277,9 @@ public void close() {
assertThat(config.option(METRIC_PUBLISHERS)).contains(metricPublisher);
assertThat(config.option(EXECUTION_ATTRIBUTES).getAttribute(execAttribute)).isEqualTo("value");
assertThat(config.option(ENDPOINT_OVERRIDDEN)).isEqualTo(Boolean.TRUE);
UnmanagedScheduledExecutorService customScheduledExecutorService =
(UnmanagedScheduledExecutorService) config.option(SCHEDULED_EXECUTOR_SERVICE);
assertThat(customScheduledExecutorService.scheduledExecutorService()).isEqualTo(scheduledExecutorService);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpClient;
Expand Down Expand Up @@ -83,6 +85,16 @@ public void executorFromBuilderNotShutdown() {
verify(executor, never()).shutdownNow();
}

@Test
public void scheduledExecutorFromBuilderNotShutdown() {
ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);

asyncClientBuilder().overrideConfiguration(c -> c.scheduledExecutorService(scheduledExecutorService)).build().close();

verify(scheduledExecutorService, never()).shutdown();
verify(scheduledExecutorService, never()).shutdownNow();
}

public ProtocolRestJsonClientBuilder syncClientBuilder() {
return ProtocolRestJsonClient.builder()
.region(Region.US_EAST_1)
Expand Down
Loading

0 comments on commit c7a1786

Please sign in to comment.