From 309bcd736e0c7af590b3d4ed08583bdc82016fc3 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Wed, 4 Dec 2024 15:31:08 -0800 Subject: [PATCH] add handler for aoss only Signed-off-by: Sean Kao --- .../flint/core/http/FlintRetryOptions.java | 22 +++++++++----- .../http/handler/HttpResultPredicate.java | 29 ------------------- .../http/RetryableHttpAsyncClientSuite.scala | 25 +++++++++++----- 3 files changed, 31 insertions(+), 45 deletions(-) delete mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java index 818ebe9d1..85c2d535c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java @@ -6,8 +6,12 @@ package org.opensearch.flint.core.http; import static java.time.temporal.ChronoUnit.SECONDS; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_AOSS; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_ES; import dev.failsafe.RetryPolicy; +import dev.failsafe.RetryPolicyBuilder; import dev.failsafe.event.ExecutionAttemptedEvent; import dev.failsafe.function.CheckedPredicate; import java.time.Duration; @@ -17,7 +21,6 @@ import org.opensearch.action.bulk.BulkResponse; import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate; import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate; -import org.opensearch.flint.core.http.handler.HttpResultPredicate; import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate; import java.io.Serializable; @@ -33,6 +36,8 @@ public class FlintRetryOptions implements Serializable { */ private final Map options; + private final boolean isAOSS; + /** * Maximum retry attempt */ @@ -49,6 +54,7 @@ public class FlintRetryOptions implements Serializable { public FlintRetryOptions(Map options) { this.options = options; + this.isAOSS = options.getOrDefault(SERVICE_NAME, SERVICE_NAME_ES) == SERVICE_NAME_AOSS; } /** @@ -67,21 +73,21 @@ public boolean isRetryEnabled() { * @return Failsafe retry policy */ public RetryPolicy getRetryPolicy() { - return RetryPolicy.builder() + RetryPolicyBuilder builder = RetryPolicy.builder() // Backoff strategy config (can be configurable as needed in future) .withBackoff(1, 30, SECONDS) .withJitter(Duration.ofMillis(100)) // Failure handling config from Flint options .withMaxRetries(getMaxRetries()) .handleIf(ExceptionClassNameFailurePredicate.create(getRetryableExceptionClassNames())) - .handleResultIf( - new HttpResultPredicate<>( - new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()), - new HttpAOSSResultPredicate<>())) + .handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes())) // Logging listener .onFailedAttempt(FlintRetryOptions::onFailure) - .onRetry(FlintRetryOptions::onRetry) - .build(); + .onRetry(FlintRetryOptions::onRetry); + if (isAOSS) { + builder.handleResultIf(new HttpAOSSResultPredicate<>()); + } + return builder.build(); } public RetryPolicy getBulkRetryPolicy(CheckedPredicate resultPredicate) { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java deleted file mode 100644 index a991acf0a..000000000 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.http.handler; - -import dev.failsafe.function.CheckedPredicate; - -/** - * Failure handler based on HTTP response. - * - * @param result type (supposed to be HttpResponse for OS client) - */ -public class HttpResultPredicate implements CheckedPredicate { - - private final HttpStatusCodeResultPredicate statusCodePredicate; - private final HttpAOSSResultPredicate responseMessagePredicate; - - public HttpResultPredicate(HttpStatusCodeResultPredicate statusCodePredicate, HttpAOSSResultPredicate responseMessagePredicate) { - this.statusCodePredicate = statusCodePredicate; - this.responseMessagePredicate = responseMessagePredicate; - } - - @Override - public boolean test(T result) throws Throwable { - return statusCodePredicate.test(result) || responseMessagePredicate.test(result); - } -} diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala index a1dd9dfb3..8a8927920 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala @@ -83,14 +83,6 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with } } - it should "retry if response message contains retryable message" in { - retryableClient - .whenResponse( - 400, - "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") - .shouldExecute(times(DEFAULT_MAX_RETRIES + 1)) - } - it should "not retry if response code is not on the retryable status code list" in { retryableClient .whenStatusCode(400) @@ -163,6 +155,23 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with expectFutureGetTimes = times(0)) } + it should "retry if AOSS response is retryable" in { + retryableClient + .withOption("auth.servicename", "aoss") + .whenResponse( + 400, + "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") + .shouldExecute(times(DEFAULT_MAX_RETRIES + 1)) + } + + it should "not apply retry policy for AOSS response if service is not AOSS" in { + retryableClient + .whenResponse( + 400, + "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") + .shouldExecute(times(1)) + } + private def retryableClient: AssertionHelper = new AssertionHelper class AssertionHelper {