Skip to content

Commit

Permalink
Add backoff retry capability in rest client (#170)
Browse files Browse the repository at this point in the history
* Add retry http client, builder and future

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add UT

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add more UT

Signed-off-by: Chen Dai <daichen@amazon.com>

* Delete IT

Signed-off-by: Chen Dai <daichen@amazon.com>

* Refactor UT

Signed-off-by: Chen Dai <daichen@amazon.com>

* Replace class name check with instanceOf check

Signed-off-by: Chen Dai <daichen@amazon.com>

* Make retryable exception list optional

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add retryable status code option and handler

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add retry enabled check

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add Spark conf and user manual

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add exception class name to Spark conf

Signed-off-by: Chen Dai <daichen@amazon.com>

* Separate failure and result handler class

Signed-off-by: Chen Dai <daichen@amazon.com>

* Refactor failure and result predicate

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add more UT

Signed-off-by: Chen Dai <daichen@amazon.com>

* Reword user manual

Signed-off-by: Chen Dai <daichen@amazon.com>

---------

Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen authored Nov 22, 2023
1 parent d3dfca1 commit 78193f5
Show file tree
Hide file tree
Showing 12 changed files with 727 additions and 22 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ lazy val flintCore = (project in file("flint-core"))
"org.opensearch.client" % "opensearch-rest-client" % opensearchVersion,
"org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion
exclude ("org.apache.logging.log4j", "log4j-api"),
"dev.failsafe" % "failsafe" % "3.3.2",
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
Expand Down
3 changes: 3 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
- `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway).
- `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown.
- `spark.flint.optimizer.enabled`: default is true.
- `spark.flint.index.hybridscan.enabled`: default is false.
- `spark.flint.index.checkpoint.mandatory`: default is true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

package org.opensearch.flint.core;

import dev.failsafe.RetryPolicy;
import java.io.Serializable;
import java.util.Map;
import org.opensearch.flint.core.http.FlintRetryOptions;

/**
* Flint Options include all the flint related configuration.
Expand All @@ -15,6 +17,11 @@ public class FlintOptions implements Serializable {

private final Map<String, String> options;

/**
* Flint options related to HTTP retry policy.
*/
private final FlintRetryOptions retryOptions;

public static final String HOST = "host";

public static final String PORT = "port";
Expand Down Expand Up @@ -68,6 +75,7 @@ public class FlintOptions implements Serializable {

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
}

public String getHost() {
Expand All @@ -88,6 +96,10 @@ public int getScrollDuration() {

public String getRefreshPolicy() {return options.getOrDefault(REFRESH_POLICY, DEFAULT_REFRESH_POLICY);}

public FlintRetryOptions getRetryOptions() {
return retryOptions;
}

public String getRegion() {
return options.getOrDefault(REGION, DEFAULT_REGION);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http;

import static java.time.temporal.ChronoUnit.SECONDS;

import dev.failsafe.RetryPolicy;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Logger;
import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate;
import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate;

/**
* Flint options related to HTTP request retry.
*/
public class FlintRetryOptions {

private static final Logger LOG = Logger.getLogger(FlintRetryOptions.class.getName());

/**
* All Flint options.
*/
private final Map<String, String> options;

/**
* Maximum retry attempt
*/
public static final int DEFAULT_MAX_RETRIES = 3;
public static final String MAX_RETRIES = "retry.max_retries";

public static final String DEFAULT_RETRYABLE_HTTP_STATUS_CODES = "429,502";
public static final String RETRYABLE_HTTP_STATUS_CODES = "retry.http_status_codes";

/**
* Retryable exception class name
*/
public static final String RETRYABLE_EXCEPTION_CLASS_NAMES = "retry.exception_class_names";

public FlintRetryOptions(Map<String, String> options) {
this.options = options;
}

/**
* Is auto retry capability enabled.
*
* @return true if enabled, otherwise false.
*/
public boolean isRetryEnabled() {
return getMaxRetries() > 0;
}

/**
* Build retry policy based on the given Flint options.
*
* @param <T> success execution result type
* @return Failsafe retry policy
*/
public <T> RetryPolicy<T> getRetryPolicy() {
return RetryPolicy.<T>builder()
// Backoff strategy config (can be configurable as needed in future)
.withBackoff(1, 30, SECONDS)
.withJitter(Duration.ofMillis(100))
// Failure handling config from Flint options
.withMaxRetries(getMaxRetries())
.handleIf(ExceptionClassNameFailurePredicate.create(getRetryableExceptionClassNames()))
.handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()))
// Logging listener
.onFailedAttempt(event ->
LOG.severe("Attempt to execute request failed: " + event))
.onRetry(ex ->
LOG.warning("Retrying failed request at #" + ex.getAttemptCount()))
.build();
}

/**
* @return maximum retry option value
*/
public int getMaxRetries() {
return Integer.parseInt(
options.getOrDefault(MAX_RETRIES, String.valueOf(DEFAULT_MAX_RETRIES)));
}

/**
* @return retryable HTTP status code list
*/
public String getRetryableHttpStatusCodes() {
return options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES);
}

/**
* @return retryable exception class name list
*/
public Optional<String> getRetryableExceptionClassNames() {
return Optional.ofNullable(options.get(RETRYABLE_EXCEPTION_CLASS_NAMES));
}

@Override
public String toString() {
return "FlintRetryOptions{" +
"maxRetries=" + getMaxRetries() +
", retryableStatusCodes=" + getRetryableHttpStatusCodes() +
", retryableExceptionClassNames=" + getRetryableExceptionClassNames() +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http;

import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.protocol.HttpContext;
import org.opensearch.flint.core.FlintOptions;

/**
* HTTP client that retries request to tolerant transient fault.
*/
public class RetryableHttpAsyncClient extends CloseableHttpAsyncClient {

private static final Logger LOG = Logger.getLogger(RetryableHttpAsyncClient.class.getName());

/**
* Delegated internal HTTP client that execute the request underlying.
*/
private final CloseableHttpAsyncClient internalClient;

/**
* Flint retry options.
*/
private final FlintRetryOptions options;

public RetryableHttpAsyncClient(CloseableHttpAsyncClient internalClient,
FlintRetryOptions options) {
this.internalClient = internalClient;
this.options = options;
}

@Override
public boolean isRunning() {
return internalClient.isRunning();
}

@Override
public void start() {
internalClient.start();
}

@Override
public void close() throws IOException {
internalClient.close();
}

@Override
public <T> Future<T> execute(HttpAsyncRequestProducer requestProducer,
HttpAsyncResponseConsumer<T> responseConsumer,
HttpContext context,
FutureCallback<T> callback) {
return new Future<>() {
/**
* Delegated future object created per doExecuteAndFutureGetWithRetry() call which creates initial object too.
* In this way, we avoid the duplicate logic of first call and subsequent retry calls.
* Here the assumption is cancel, isCancelled and isDone never called before get().
* (OpenSearch RestClient seems only call get() API)
*/
private Future<T> delegate;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegate.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return delegate.isCancelled();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public T get() throws InterruptedException, ExecutionException {
return doExecuteAndFutureGetWithRetry(() -> delegate.get());
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
return doExecuteAndFutureGetWithRetry(() -> delegate.get(timeout, unit));
}

private T doExecuteAndFutureGetWithRetry(Callable<T> futureGet) throws InterruptedException, ExecutionException {
try {
// Retry by creating a new Future object (as new delegate) and get its result again
return Failsafe
.with(options.getRetryPolicy())
.get(() -> {
this.delegate = internalClient.execute(requestProducer, responseConsumer, context, callback);
return futureGet.call();
});
} catch (FailsafeException ex) {
LOG.severe("Request failed permanently. Re-throwing original exception.");

// Failsafe will wrap checked exception, such as ExecutionException
// So here we have to unwrap failsafe exception and rethrow it
Throwable cause = ex.getCause();
if (cause instanceof InterruptedException) {
throw (InterruptedException) cause;
} else if (cause instanceof ExecutionException) {
throw (ExecutionException) cause;
} else {
throw ex;
}
}
}
};
}

public static HttpAsyncClientBuilder builder(HttpAsyncClientBuilder delegate, FlintOptions options) {
FlintRetryOptions retryOptions = options.getRetryOptions();
if (!retryOptions.isRetryEnabled()) {
return delegate;
}

// Wrap original builder so created client will be wrapped by retryable client too
return new HttpAsyncClientBuilder() {
@Override
public CloseableHttpAsyncClient build() {
LOG.info("Building retryable http async client with options: " + retryOptions);
return new RetryableHttpAsyncClient(delegate.build(), retryOptions);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http.handler;

import dev.failsafe.function.CheckedPredicate;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Logger;

/**
* Failure predicate that determines if retryable based on error stacktrace iteration.
*/
public abstract class ErrorStacktraceFailurePredicate implements CheckedPredicate<Throwable> {

private static final Logger LOG = Logger.getLogger(ErrorStacktraceFailurePredicate.class.getName());

/**
* This base class implementation iterates the stacktrace and pass each exception
* to subclass for retryable decision.
*/
@Override
public boolean test(Throwable throwable) throws Throwable {
// Use extra set to Handle nested exception to avoid dead loop
Set<Throwable> seen = new HashSet<>();

while (throwable != null && seen.add(throwable)) {
LOG.info("Checking if exception retryable: " + throwable);

if (isRetryable(throwable)) {
LOG.info("Exception is retryable: " + throwable);
return true;
}
throwable = throwable.getCause();
}

LOG.info("No retryable exception found on the stacktrace");
return false;
}

/**
* Is exception retryable decided by subclass implementation
*/
protected abstract boolean isRetryable(Throwable throwable);
}
Loading

0 comments on commit 78193f5

Please sign in to comment.