Skip to content

Commit

Permalink
Now it's possible to configure NettyNioAsyncHttpClient in order to use a
Browse files Browse the repository at this point in the history
non blocking DNS resolver.
  • Loading branch information
martinKindall committed May 15, 2023
1 parent f12ccc7 commit 8dc8fe6
Show file tree
Hide file tree
Showing 13 changed files with 502 additions and 124 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-NettyNIOHTTPClient-35595eb.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Netty NIO HTTP Client",
"contributor": "martinKindall",
"type": "bugfix",
"description": "By default, Netty threads are blocked during dns resolution, namely InetAddress.getByName is used under the hood. Now, there's an option to configure the NettyNioAsyncHttpClient in order to use a non blocking dns resolution strategy."
}
10 changes: 10 additions & 0 deletions bom-internal/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@
<artifactId>netty-buffer</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@ public final class SdkHttpConfigurationOption<T> extends AttributeMap.Key<T> {
public static final SdkHttpConfigurationOption<Duration> TLS_NEGOTIATION_TIMEOUT =
new SdkHttpConfigurationOption<>("TlsNegotiationTimeout", Duration.class);

/**
* Configure whether to use a non-blocking dns resolver or not. False by default, as netty's default dns resolver is
* blocking; it namely calls java.net.InetAddress.getByName.
* <p>
* When enabled, a non-blocking dns resolver will be used instead, by modifying netty's bootstrap configuration.
* See https://netty.io/news/2016/05/26/4-1-0-Final.html
*/
public static final SdkHttpConfigurationOption<Boolean> USE_NONBLOCKING_DNS_RESOLVER =
new SdkHttpConfigurationOption<>("UseNonBlockingDnsResolver", Boolean.class);

private static final Duration DEFAULT_SOCKET_READ_TIMEOUT = Duration.ofSeconds(30);
private static final Duration DEFAULT_SOCKET_WRITE_TIMEOUT = Duration.ofSeconds(30);
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(2);
Expand All @@ -152,6 +162,8 @@ public final class SdkHttpConfigurationOption<T> extends AttributeMap.Key<T> {
private static final TlsTrustManagersProvider DEFAULT_TLS_TRUST_MANAGERS_PROVIDER = null;
private static final TlsKeyManagersProvider DEFAULT_TLS_KEY_MANAGERS_PROVIDER = SystemPropertyTlsKeyManagersProvider.create();

private static final Boolean DEFAULT_USE_NONBLOCKING_DNS_RESOLVER = Boolean.FALSE;

public static final AttributeMap GLOBAL_HTTP_DEFAULTS = AttributeMap
.builder()
.put(READ_TIMEOUT, DEFAULT_SOCKET_READ_TIMEOUT)
Expand All @@ -169,6 +181,7 @@ public final class SdkHttpConfigurationOption<T> extends AttributeMap.Key<T> {
.put(TLS_KEY_MANAGERS_PROVIDER, DEFAULT_TLS_KEY_MANAGERS_PROVIDER)
.put(TLS_TRUST_MANAGERS_PROVIDER, DEFAULT_TLS_TRUST_MANAGERS_PROVIDER)
.put(TLS_NEGOTIATION_TIMEOUT, DEFAULT_TLS_NEGOTIATION_TIMEOUT)
.put(USE_NONBLOCKING_DNS_RESOLVER, DEFAULT_USE_NONBLOCKING_DNS_RESOLVER)
.build();

private final String name;
Expand Down
8 changes: 8 additions & 0 deletions http-clients/netty-nio-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport-classes-epoll</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns</artifactId>
</dependency>

<!--Reactive Dependencies-->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,15 @@ public interface Builder extends SdkAsyncHttpClient.Builder<NettyNioAsyncHttpCli
* @return the builder for method chaining.
*/
Builder http2Configuration(Consumer<Http2Configuration.Builder> http2ConfigurationBuilderConsumer);

/**
* Configure whether to use a non-blocking dns resolver or not. False by default, as netty's default dns resolver is
* blocking; it namely calls java.net.InetAddress.getByName.
* <p>
* When enabled, a non-blocking dns resolver will be used instead, by modifying netty's bootstrap configuration.
* See https://netty.io/news/2016/05/26/4-1-0-Final.html
*/
Builder useNonBlockingDnsResolver(Boolean useNonBlockingDnsResolver);
}

/**
Expand Down Expand Up @@ -716,6 +725,16 @@ public void setHttp2Configuration(Http2Configuration http2Configuration) {
http2Configuration(http2Configuration);
}

@Override
public Builder useNonBlockingDnsResolver(Boolean useNonBlockingDnsResolver) {
standardOptions.put(SdkHttpConfigurationOption.USE_NONBLOCKING_DNS_RESOLVER, useNonBlockingDnsResolver);
return this;
}

public void setUseNonBlockingDnsResolver(Boolean useNonBlockingDnsResolver) {
useNonBlockingDnsResolver(useNonBlockingDnsResolver);
}

@Override
public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
if (standardOptions.get(SdkHttpConfigurationOption.TLS_NEGOTIATION_TIMEOUT) == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.resolver.dns.NoopDnsCache;
import java.net.InetSocketAddress;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
Expand Down Expand Up @@ -56,8 +61,21 @@ public Bootstrap createBootstrap(String host, int port) {
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyConfiguration.connectTimeoutMillis())
.option(ChannelOption.SO_KEEPALIVE, nettyConfiguration.tcpKeepAlive())
.remoteAddress(InetSocketAddress.createUnresolved(host, port));

if (nettyConfiguration.isNonBlockingResolver()) {
bootstrap.resolver(nonBlockingResolverGroup());
}

sdkChannelOptions.channelOptions().forEach(bootstrap::option);

return bootstrap;
}

private AddressResolverGroup<InetSocketAddress> nonBlockingResolverGroup() {
DnsNameResolverBuilder builder = new DnsNameResolverBuilder()
.channelType(NioDatagramChannel.class)
.resolveCache(NoopDnsCache.INSTANCE);

return new DnsAddressResolverGroup(builder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,8 @@ public boolean tcpKeepAlive() {
public Duration tlsHandshakeTimeout() {
return configuration.get(SdkHttpConfigurationOption.TLS_NEGOTIATION_TIMEOUT);
}

public boolean isNonBlockingResolver() {
return configuration.get(SdkHttpConfigurationOption.USE_NONBLOCKING_DNS_RESOLVER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import software.amazon.awssdk.http.EmptyPublisher;
import software.amazon.awssdk.http.FileStoreTlsKeyManagersProvider;
import software.amazon.awssdk.http.HttpTestUtils;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.TlsKeyManagersProvider;
Expand Down Expand Up @@ -185,6 +186,24 @@ public void nonProxy_noKeyManagerGiven_shouldThrowException() {
.hasRootCauseInstanceOf(SSLException.class);
}

@Test
public void builderUsesProvidedKeyManagersProviderNonBlockingDns() {
TlsKeyManagersProvider mockKeyManagersProvider = mock(TlsKeyManagersProvider.class);
netty = NettyNioAsyncHttpClient.builder()
.proxyConfiguration(proxyCfg)
.tlsKeyManagersProvider(mockKeyManagersProvider)
.buildWithDefaults(AttributeMap.builder()
.put(TRUST_ALL_CERTIFICATES, true)
.put(SdkHttpConfigurationOption.USE_NONBLOCKING_DNS_RESOLVER, true)
.build());

try {
sendRequest(netty, new RecordingResponseHandler());
} catch (Exception ignored) {
}
verify(mockKeyManagersProvider).keyManagers();
}

private void sendRequest(SdkAsyncHttpClient client, SdkAsyncHttpResponseHandler responseHandler) {
AsyncExecuteRequest req = AsyncExecuteRequest.builder()
.request(testSdkRequest())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.any;
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static java.util.Collections.singletonMap;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.commons.lang3.StringUtils.reverse;
import static org.assertj.core.api.Assertions.assertThat;
import static software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClientTestUtils.assertCanReceiveBasicRequest;
import static software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClientTestUtils.createProvider;
import static software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClientTestUtils.createRequest;
import static software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClientTestUtils.makeSimpleRequest;

import com.github.tomakehurst.wiremock.junit.WireMockRule;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.assertj.core.api.Condition;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.utils.AttributeMap;

@RunWith(MockitoJUnitRunner.class)
public class NettyNioAsyncHttpClientNonBlockingDnsTest {

private final RecordingNetworkTrafficListener wiremockTrafficListener = new RecordingNetworkTrafficListener();

private static final SdkAsyncHttpClient client = NettyNioAsyncHttpClient.builder()
.buildWithDefaults(
AttributeMap.builder()
.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true)
.put(SdkHttpConfigurationOption.USE_NONBLOCKING_DNS_RESOLVER, true)
.build());

@Rule
public WireMockRule mockServer = new WireMockRule(wireMockConfig()
.dynamicPort()
.dynamicHttpsPort()
.networkTrafficListener(wiremockTrafficListener));

@Before
public void methodSetup() {
wiremockTrafficListener.reset();
}

@AfterClass
public static void tearDown() throws Exception {
client.close();
}

@Test
public void useNonBlockingDnsResolver_shouldHonor() {
try (NettyNioAsyncHttpClient client = (NettyNioAsyncHttpClient) NettyNioAsyncHttpClient.builder()
.build()) {
assertThat(client.configuration().isNonBlockingResolver()).isEqualTo(false);
}

try (NettyNioAsyncHttpClient client = (NettyNioAsyncHttpClient) NettyNioAsyncHttpClient.builder()
.useNonBlockingDnsResolver(false)
.build()) {
assertThat(client.configuration().isNonBlockingResolver()).isEqualTo(false);
}

try (NettyNioAsyncHttpClient client = (NettyNioAsyncHttpClient) NettyNioAsyncHttpClient.builder()
.useNonBlockingDnsResolver(true)
.build()) {
assertThat(client.configuration().isNonBlockingResolver()).isEqualTo(true);
}
}

@Test
public void canSendContentAndGetThatContentBackNonBlockingDns() throws Exception {
String body = randomAlphabetic(50);
stubFor(any(urlEqualTo("/echo?reversed=true"))
.withRequestBody(equalTo(body))
.willReturn(aResponse().withBody(reverse(body))));
URI uri = URI.create("http://localhost:" + mockServer.port());

SdkHttpRequest request = createRequest(uri, "/echo", body, SdkHttpMethod.POST, singletonMap("reversed", "true"));

RecordingResponseHandler recorder = new RecordingResponseHandler();

client.execute(AsyncExecuteRequest.builder().request(request).requestContentPublisher(createProvider(body)).responseHandler(recorder).build());

recorder.completeFuture.get(5, TimeUnit.SECONDS);

verify(1, postRequestedFor(urlEqualTo("/echo?reversed=true")));

assertThat(recorder.fullResponseAsString()).isEqualTo(reverse(body));
}

@Test
public void defaultThreadFactoryUsesHelpfulName() throws Exception {
// Make a request to ensure a thread is primed
makeSimpleRequest(client, mockServer);

String expectedPattern = "aws-java-sdk-NettyEventLoop-\\d+-\\d+";
assertThat(Thread.getAllStackTraces().keySet())
.areAtLeast(1, new Condition<>(t -> t.getName().matches(expectedPattern),
"Matches default thread pattern: `%s`", expectedPattern));
}

@Test
public void canMakeBasicRequestOverHttp() throws Exception {
String smallBody = randomAlphabetic(10);
URI uri = URI.create("http://localhost:" + mockServer.port());

assertCanReceiveBasicRequest(client, uri, smallBody);
}

@Test
public void canMakeBasicRequestOverHttps() throws Exception {
String smallBody = randomAlphabetic(10);
URI uri = URI.create("https://localhost:" + mockServer.httpsPort());

assertCanReceiveBasicRequest(client, uri, smallBody);
}

@Test
public void canHandleLargerPayloadsOverHttp() throws Exception {
String largishBody = randomAlphabetic(25000);

URI uri = URI.create("http://localhost:" + mockServer.port());

assertCanReceiveBasicRequest(client, uri, largishBody);
}

@Test
public void canHandleLargerPayloadsOverHttps() throws Exception {
String largishBody = randomAlphabetic(25000);

URI uri = URI.create("https://localhost:" + mockServer.httpsPort());

assertCanReceiveBasicRequest(client, uri, largishBody);
}

@Test
public void requestContentOnlyEqualToContentLengthHeaderFromProvider() throws InterruptedException, ExecutionException, TimeoutException, IOException {
final String content = randomAlphabetic(32);
final String streamContent = content + reverse(content);
stubFor(any(urlEqualTo("/echo?reversed=true"))
.withRequestBody(equalTo(content))
.willReturn(aResponse().withBody(reverse(content))));
URI uri = URI.create("http://localhost:" + mockServer.port());

SdkHttpFullRequest request = createRequest(uri, "/echo", streamContent, SdkHttpMethod.POST, singletonMap("reversed", "true"));
request = request.toBuilder().putHeader("Content-Length", Integer.toString(content.length())).build();
RecordingResponseHandler recorder = new RecordingResponseHandler();

client.execute(AsyncExecuteRequest.builder().request(request).requestContentPublisher(createProvider(streamContent)).responseHandler(recorder).build());

recorder.completeFuture.get(5, TimeUnit.SECONDS);

// HTTP servers will stop processing the request as soon as it reads
// bytes equal to 'Content-Length' so we need to inspect the raw
// traffic to ensure that there wasn't anything after that.
assertThat(wiremockTrafficListener.requests().toString()).endsWith(content);
}
}
Loading

0 comments on commit 8dc8fe6

Please sign in to comment.