Skip to content

Commit

Permalink
check retryable response only if 400
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <seankao@amazon.com>
  • Loading branch information
seankao-az committed Dec 4, 2024
1 parent 54c426f commit 4212dac
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.logging.Logger;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate;
import org.opensearch.flint.core.http.handler.HttpResponseMessageResultPredicate;
import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate;
import org.opensearch.flint.core.http.handler.HttpResultPredicate;
import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate;
import java.io.Serializable;
Expand All @@ -42,9 +42,6 @@ public class FlintRetryOptions implements Serializable {
public static final String DEFAULT_RETRYABLE_HTTP_STATUS_CODES = "429,502";
public static final String RETRYABLE_HTTP_STATUS_CODES = "retry.http_status_codes";

public static final String DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES = "resource_already_exists_exception";
public static final String RETRYABLE_HTTP_RESPONSE_MESSAGES = "retry.http_response_messages";

/**
* Retryable exception class name
*/
Expand Down Expand Up @@ -80,7 +77,7 @@ public <T> RetryPolicy<T> getRetryPolicy() {
.handleResultIf(
new HttpResultPredicate<>(
new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()),
new HttpResponseMessageResultPredicate<>(getRetryableHttpResponseMessages())))
new HttpAOSSResultPredicate<>()))
// Logging listener
.onFailedAttempt(FlintRetryOptions::onFailure)
.onRetry(FlintRetryOptions::onRetry)
Expand Down Expand Up @@ -124,13 +121,6 @@ public String getRetryableHttpStatusCodes() {
return options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES);
}

/**
* @return retryable HTTP response message list
*/
public String getRetryableHttpResponseMessages() {
return options.getOrDefault(RETRYABLE_HTTP_RESPONSE_MESSAGES, DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES);
}

/**
* @return retryable exception class name list
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http.handler;

import dev.failsafe.function.CheckedPredicate;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.util.EntityUtils;

import java.util.Arrays;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
* Failure handler based on HTTP response from AOSS.
*
* @param <T> result type (supposed to be HttpResponse for OS client)
*/
public class HttpAOSSResultPredicate<T> implements CheckedPredicate<T> {

private static final Logger LOG = Logger.getLogger(HttpAOSSResultPredicate.class.getName());

public static final int BAD_REQUEST_STATUS_CODE = 400;
public static final String RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE = "resource_already_exists_exception";

public HttpAOSSResultPredicate() { }

@Override
public boolean test(T result) throws Throwable {
LOG.info("Checking if response is retryable");

int statusCode = ((HttpResponse) result).getStatusLine().getStatusCode();
if (statusCode != BAD_REQUEST_STATUS_CODE) {
LOG.info("Status code " + statusCode + " is not " + BAD_REQUEST_STATUS_CODE + ". Check result: false");
return false;
}

HttpResponse response = (HttpResponse) result;
HttpEntity entity = response.getEntity();
if (entity == null) {
LOG.info("No response entity found. Check result: false");
return false;
}

// Buffer the entity to make it repeatable
BufferedHttpEntity bufferedEntity = new BufferedHttpEntity(entity);
response.setEntity(bufferedEntity);

try {
String responseContent = EntityUtils.toString(bufferedEntity);
// Reset the entity's content
bufferedEntity.getContent().reset();

boolean isRetryable = responseContent.contains(RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE);

LOG.info("Check retryable response result: " + isRetryable);
return isRetryable;
} catch (Exception e) {
LOG.info("Unable to parse response body. Check result: false");
return false;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
public class HttpResultPredicate<T> implements CheckedPredicate<T> {

private final HttpStatusCodeResultPredicate<T> statusCodePredicate;
private final HttpResponseMessageResultPredicate<T> responseMessagePredicate;
private final HttpAOSSResultPredicate<T> responseMessagePredicate;

public HttpResultPredicate(HttpStatusCodeResultPredicate<T> statusCodePredicate, HttpResponseMessageResultPredicate<T> responseMessagePredicate) {
public HttpResultPredicate(HttpStatusCodeResultPredicate<T> statusCodePredicate, HttpAOSSResultPredicate<T> responseMessagePredicate) {
this.statusCodePredicate = statusCodePredicate;
this.responseMessagePredicate = responseMessagePredicate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with

it should "retry if response message contains retryable message" in {
retryableClient
.whenResponseMessage(
.whenResponse(
400,
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(DEFAULT_MAX_RETRIES + 1))
}
Expand Down Expand Up @@ -184,11 +185,12 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
this
}

def whenResponseMessage(responseMessage: String): AssertionHelper = {
def whenResponse(statusCode: Int, responseMessage: String): AssertionHelper = {
val entity = mock[HttpEntity](RETURNS_DEEP_STUBS)
mockStatic(classOf[EntityUtils])
when(EntityUtils.toString(any[HttpEntity])).thenReturn(responseMessage)
val response = mock[HttpResponse](RETURNS_DEEP_STUBS)
when(response.getStatusLine.getStatusCode).thenReturn(statusCode)
when(response.getEntity).thenReturn(entity)
when(future.get()).thenReturn(response)
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,6 @@ object FlintSparkConf {
.doc("retryable HTTP response status code list")
.createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_STATUS_CODES)

val RETRYABLE_HTTP_RESPONSE_MESSAGES =
FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_RESPONSE_MESSAGES}")
.datasourceOption()
.doc("retryable HTTP response message list")
.createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES)

val RETRYABLE_EXCEPTION_CLASS_NAMES =
FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_EXCEPTION_CLASS_NAMES}")
.datasourceOption()
Expand Down Expand Up @@ -339,7 +333,6 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
AUTH,
MAX_RETRIES,
RETRYABLE_HTTP_STATUS_CODES,
RETRYABLE_HTTP_RESPONSE_MESSAGES,
BULK_REQUEST_RATE_LIMIT_PER_NODE,
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class FlintSparkConfSuite extends FlintSuite {
val retryOptions = FlintSparkConf().flintOptions().getRetryOptions
retryOptions.getMaxRetries shouldBe DEFAULT_MAX_RETRIES
retryOptions.getRetryableHttpStatusCodes shouldBe DEFAULT_RETRYABLE_HTTP_STATUS_CODES
retryOptions.getRetryableHttpResponseMessages shouldBe DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES
retryOptions.getRetryableExceptionClassNames shouldBe Optional.empty
}

Expand All @@ -56,14 +55,12 @@ class FlintSparkConfSuite extends FlintSuite {
Map(
"retry.max_retries" -> "5",
"retry.http_status_codes" -> "429,502,503,504",
"retry.http_response_messages" -> "message1,message2",
"retry.exception_class_names" -> "java.net.ConnectException").asJava)
.flintOptions()
.getRetryOptions

retryOptions.getMaxRetries shouldBe 5
retryOptions.getRetryableHttpStatusCodes shouldBe "429,502,503,504"
retryOptions.getRetryableHttpResponseMessages shouldBe "message1,message2"
retryOptions.getRetryableExceptionClassNames.get() shouldBe "java.net.ConnectException"
}

Expand Down

0 comments on commit 4212dac

Please sign in to comment.