From a0ca15c44150d69c3d8cc4a907ef31d831ecf0f9 Mon Sep 17 00:00:00 2001 From: "Daniel (dB.) Doubrovkine" Date: Mon, 23 Jan 2023 13:45:53 -0500 Subject: [PATCH] Refactor two-client usage. (#333) * Refactor two-client usage. Signed-off-by: dblock * Avoid double-closing the client on cleanup. Signed-off-by: dblock * Refactor constructors to take sync/async explicitly. Signed-off-by: dblock * Corrected comments and cleaned up variable naming. Signed-off-by: dblock Signed-off-by: dblock --- .../transport/aws/AwsSdk2Transport.java | 123 ++++++------------ .../aws/AwsSdk2TransportTestCase.java | 36 ++--- 2 files changed, 51 insertions(+), 108 deletions(-) diff --git a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java index c567230559ae0..94e0d3ba42190 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java @@ -38,6 +38,7 @@ import software.amazon.awssdk.http.async.AsyncExecuteRequest; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.utils.SdkAutoCloseable; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; @@ -70,8 +71,7 @@ public class AwsSdk2Transport implements OpenSearchTransport { public static final Integer DEFAULT_REQUEST_COMPRESSION_SIZE = 8192; private static final byte[] NO_BYTES = new byte[0]; - private final SdkHttpClient httpClient; - private final SdkAsyncHttpClient asyncHttpClient; + private final SdkAutoCloseable httpClient; private final String host; private final String signingServiceName; private final Region signingRegion; @@ -79,12 +79,12 @@ public class AwsSdk2Transport implements OpenSearchTransport { private final AwsSdk2TransportOptions transportOptions; /** - * Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client. + * Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client. *

* Note that asynchronous OpenSearch requests sent through this transport will be dispatched * *synchronously* on the calling thread. * - * @param httpClient HTTP client to use for OpenSearch requests. + * @param asyncHttpClient Asynchronous HTTP client to use for OpenSearch requests. * @param host The fully qualified domain name to connect to. * @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`. * @param options Options that apply to all requests. Can be null. Create with @@ -92,132 +92,82 @@ public class AwsSdk2Transport implements OpenSearchTransport { * compression options, etc. */ public AwsSdk2Transport( - @Nonnull SdkHttpClient httpClient, + @CheckForNull SdkAsyncHttpClient asyncHttpClient, @Nonnull String host, @Nonnull Region signingRegion, @CheckForNull AwsSdk2TransportOptions options) { - this(httpClient, null, host, "es", signingRegion, options); + this(asyncHttpClient, host, "es", signingRegion, options); } /** * Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client. - *

- * Note that asynchronous OpenSearch requests sent through this transport will be dispatched - * *synchronously* on the calling thread. * - * @param httpClient HTTP client to use for OpenSearch requests. + * @param syncHttpClient Synchronous HTTP client to use for OpenSearch requests. * @param host The fully qualified domain name to connect to. - * @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless). * @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`. * @param options Options that apply to all requests. Can be null. Create with * {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials, * compression options, etc. */ public AwsSdk2Transport( - @Nonnull SdkHttpClient httpClient, + @CheckForNull SdkHttpClient syncHttpClient, @Nonnull String host, - @Nonnull String signingServiceName, @Nonnull Region signingRegion, @CheckForNull AwsSdk2TransportOptions options) { - this(httpClient, null, host, signingServiceName, signingRegion, options); - } - - /** - * Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client - *

- * Note that synchronous OpenSearch requests sent through this transport will be dispatched - * using the asynchronous client, but the calling thread will block until they are complete. - * - * @param asyncHttpClient HTTP client to use for OpenSearch requests. - * @param host The target host. - * @param signingRegion The AWS region for which requests will be signed. This should typically match region in `host`. - * @param options Options that apply to all requests. Can be null. Create with - * {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials, - * compression options, etc. - */ - public AwsSdk2Transport( - @Nonnull SdkAsyncHttpClient asyncHttpClient, - @Nonnull String host, - @Nonnull Region signingRegion, - @CheckForNull AwsSdk2TransportOptions options) { - this(null, asyncHttpClient, host, "es", signingRegion, options); + this(syncHttpClient, host, "es", signingRegion, options); } /** * Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client. *

- * Note that synchronous OpenSearch requests sent through this transport will be dispatched - * using the asynchronous client, but the calling thread will block until they are complete. + * Note that asynchronous OpenSearch requests sent through this transport will be dispatched + * *synchronously* on the calling thread. * - * @param asyncHttpClient HTTP client to use for OpenSearch requests. - * @param host The target host. - * @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless). + * @param asyncHttpClient Asynchronous HTTP client to use for OpenSearch requests. + * @param host The fully qualified domain name to connect to. * @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`. + * @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless). * @param options Options that apply to all requests. Can be null. Create with * {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials, * compression options, etc. */ public AwsSdk2Transport( - @Nonnull SdkAsyncHttpClient asyncHttpClient, + @CheckForNull SdkAsyncHttpClient asyncHttpClient, @Nonnull String host, @Nonnull String signingServiceName, @Nonnull Region signingRegion, @CheckForNull AwsSdk2TransportOptions options) { - this(null, asyncHttpClient, host, signingServiceName, signingRegion, options); + this((SdkAutoCloseable) asyncHttpClient, host, signingServiceName, signingRegion, options); } /** - * Create an {@link OpenSearchTransport} with both synchronous and asynchronous AWS HTTP clients. - *

- * The synchronous client will be used for synchronous OpenSearch requests, and the asynchronous client - * will be used for asynchronous HTTP requests. + * Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client. * - * @param httpClient HTTP client to use for OpenSearch requests. - * @param asyncHttpClient HTTP client to use for synchronous OpenSearch requests. + * @param syncHttpClient Synchronous HTTP client to use for OpenSearch requests. * @param host The fully qualified domain name to connect to. * @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`. + * @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless). * @param options Options that apply to all requests. Can be null. Create with * {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials, * compression options, etc. */ public AwsSdk2Transport( - @CheckForNull SdkHttpClient httpClient, - @CheckForNull SdkAsyncHttpClient asyncHttpClient, + @CheckForNull SdkHttpClient syncHttpClient, @Nonnull String host, + @Nonnull String signingServiceName, @Nonnull Region signingRegion, @CheckForNull AwsSdk2TransportOptions options) { - this(httpClient, asyncHttpClient, host, "es", signingRegion, options); + this((SdkAutoCloseable) syncHttpClient, host, signingServiceName, signingRegion, options); } - /** - * Create an {@link OpenSearchTransport} with both synchronous and asynchronous AWS HTTP clients. - *

- * The synchronous client will be used for synchronous OpenSearch requests, and the asynchronous client - * will be used for asynchronous HTTP requests. - * - * @param httpClient HTTP client to use for OpenSearch requests. - * @param asyncHttpClient HTTP client to use for synchronous OpenSearch requests. - * @param host The fully qualified domain name to connect to. - * @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`. - * @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless). - * @param options Options that apply to all requests. Can be null. Create with - * {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials, - * compression options, etc. - */ - public AwsSdk2Transport( - @CheckForNull SdkHttpClient httpClient, - @CheckForNull SdkAsyncHttpClient asyncHttpClient, + private AwsSdk2Transport( + @CheckForNull SdkAutoCloseable httpClient, @Nonnull String host, @Nonnull String signingServiceName, @Nonnull Region signingRegion, @CheckForNull AwsSdk2TransportOptions options) { - if (httpClient == null && asyncHttpClient == null) - { - throw new IllegalArgumentException("At least one SdkHttpClient or SdkAsyncHttpClient must be provided"); - } Objects.requireNonNull(host, "Target OpenSearch service host must not be null"); this.httpClient = httpClient; - this.asyncHttpClient = asyncHttpClient; this.host = host; this.signingServiceName = signingServiceName; this.signingRegion = signingRegion; @@ -237,11 +187,11 @@ public ResponseT performRequest( OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options); SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody); - if (httpClient != null) { - return executeSync(clientReq, endpoint, options); - } else { + if (httpClient instanceof SdkHttpClient) { + return executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options); + } else if (httpClient instanceof SdkAsyncHttpClient) { try { - return executeAsync(clientReq, requestBody, endpoint, options).get(); + return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options).get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause != null) { @@ -257,6 +207,8 @@ public ResponseT performRequest( } catch (InterruptedException e) { throw new IOException("HttpRequest was interrupted", e); } + } else { + throw new IOException("invalid httpClient: " + httpClient); } } @@ -269,11 +221,13 @@ public CompletableFuture performRequest try { OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options); SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody); - if (asyncHttpClient != null) { - return executeAsync(clientReq, requestBody, endpoint, options); - } else { - ResponseT result = executeSync(clientReq, endpoint, options); + if (httpClient instanceof SdkAsyncHttpClient) { + return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options); + } else if (httpClient instanceof SdkHttpClient) { + ResponseT result = executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options); return CompletableFuture.completedFuture(result); + } else { + throw new IOException("invalid httpClient: " + httpClient); } } catch (Throwable e) { CompletableFuture cf = new CompletableFuture<>(); @@ -418,6 +372,7 @@ private void applyOptionsHeaders(SdkHttpFullRequest.Builder builder, TransportOp } private ResponseT executeSync( + SdkHttpClient syncHttpClient, SdkHttpFullRequest httpRequest, Endpoint endpoint, TransportOptions options @@ -427,7 +382,7 @@ private ResponseT executeSync( if (httpRequest.contentStreamProvider().isPresent()) { executeRequest.contentStreamProvider(httpRequest.contentStreamProvider().get()); } - HttpExecuteResponse executeResponse = httpClient.prepareRequest(executeRequest.build()).call(); + HttpExecuteResponse executeResponse = syncHttpClient.prepareRequest(executeRequest.build()).call(); AbortableInputStream bodyStream = null; try { bodyStream = executeResponse.responseBody().orElse(null); @@ -441,6 +396,7 @@ private ResponseT executeSync( } private CompletableFuture executeAsync( + SdkAsyncHttpClient asyncHttpClient, SdkHttpFullRequest httpRequest, @CheckForNull OpenSearchRequestBodyBuffer requestBody, Endpoint endpoint, @@ -543,7 +499,6 @@ private ResponseT parseResponse( ResponseT response = (ResponseT) new BooleanResponse(bep.getResult(statusCode)); return response; } else if (endpoint instanceof JsonEndpoint) { - @SuppressWarnings("unchecked") JsonEndpoint jsonEndpoint = (JsonEndpoint) endpoint; // Successful response ResponseT response = null; diff --git a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2TransportTestCase.java b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2TransportTestCase.java index fa1d33f39a3ee..8adcd4300bd0e 100644 --- a/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2TransportTestCase.java +++ b/java-client/src/test/java/org/opensearch/client/opensearch/integTest/aws/AwsSdk2TransportTestCase.java @@ -8,7 +8,6 @@ package org.opensearch.client.opensearch.integTest.aws; - import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; @@ -42,7 +41,6 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; - public abstract class AwsSdk2TransportTestCase { public static final String TEST_INDEX = "opensearch-java-integtest"; @@ -85,16 +83,14 @@ protected OpenSearchClient getClient( getTestClusterHost(), getTestClusterServiceName(), getTestClusterRegion(), - getTransportOptions().build() - ); + getTransportOptions().build()); } else { transport = new AwsSdk2Transport( getHttpClient(), getTestClusterHost(), getTestClusterServiceName(), getTestClusterRegion(), - getTransportOptions().build() - ); + getTransportOptions().build()); } return new OpenSearchClient(transport); } @@ -111,16 +107,14 @@ protected OpenSearchAsyncClient getAsyncClient( getTestClusterHost(), getTestClusterServiceName(), getTestClusterRegion(), - getTransportOptions().build() - ); + getTransportOptions().build()); } else { transport = new AwsSdk2Transport( getHttpClient(), getTestClusterHost(), getTestClusterServiceName(), getTestClusterRegion(), - getTransportOptions().build() - ); + getTransportOptions().build()); } return new OpenSearchAsyncClient(transport); } @@ -137,16 +131,14 @@ protected OpenSearchIndicesClient getIndexesClient( getTestClusterHost(), getTestClusterServiceName(), getTestClusterRegion(), - getTransportOptions().build() - ); + getTransportOptions().build()); } else { transport = new AwsSdk2Transport( getHttpClient(), getTestClusterHost(), getTestClusterServiceName(), getTestClusterRegion(), - getTransportOptions().build() - ); + getTransportOptions().build()); } return new OpenSearchIndicesClient(transport); } @@ -171,6 +163,7 @@ public static void cleanupClients() throws IOException { if (httpClient != null) { try { httpClient.close(); + httpClient = null; } catch (Throwable e) { // Not our problem } @@ -178,6 +171,7 @@ public static void cleanupClients() throws IOException { if (asyncHttpClient != null) { try { asyncHttpClient.close(); + asyncHttpClient = null; } catch (Throwable e) { // Not our problem } @@ -204,10 +198,8 @@ public void resetTestIndex(boolean async) throws Exception { IndexState indexInfo = client.get(b -> b.index(TEST_INDEX)).get(TEST_INDEX); if (indexInfo != null) { indexExists = true; - } - } catch ( - OpenSearchException e) { + } catch (OpenSearchException e) { if (e.status() != 404) { throw e; } @@ -237,17 +229,14 @@ protected SearchResponse query(OpenSearchClient client, String title .ignoreThrottled(false) .sort( new SortOptions.Builder().score(o -> o.order(SortOrder.Desc)).build(), - new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build() - ) + new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build()) .query(query); - return client.search(req.build(), SimplePojo.class); } protected CompletableFuture> query( - OpenSearchAsyncClient client, String title, String text - ) { + OpenSearchAsyncClient client, String title, String text) { var query = Query.of(qb -> { if (title != null) { qb.match(mb -> mb.field("title").query(vb -> vb.stringValue(title))); @@ -264,8 +253,7 @@ protected CompletableFuture> query( .ignoreThrottled(false) .sort( new SortOptions.Builder().score(o -> o.order(SortOrder.Desc)).build(), - new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build() - ) + new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build()) .query(query); try {