Skip to content

Commit

Permalink
refactor: reduce StandardHttpClient retry complexity
Browse files Browse the repository at this point in the history
- Abstracts the retryWithExponentialBackoff method so that it can be reused
- Provides specific testing for all scenarios

Signed-off-by: Marc Nuri <marc@marcnuri.com>
  • Loading branch information
manusa committed May 22, 2023
1 parent 6d7db80 commit c984cff
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.fabric8.kubernetes.client.http.AsyncBody.Consumer;
import io.fabric8.kubernetes.client.http.Interceptor.RequestTags;
import io.fabric8.kubernetes.client.http.WebSocket.Listener;
import io.fabric8.kubernetes.client.utils.AsyncUtils;
import io.fabric8.kubernetes.client.utils.ExponentialBackoffIntervalCalculator;
import io.fabric8.kubernetes.client.utils.Utils;
import org.slf4j.Logger;
Expand All @@ -34,18 +35,16 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;

public abstract class StandardHttpClient<C extends HttpClient, F extends HttpClient.Factory, T extends StandardHttpClientBuilder<C, F, ?>>
implements HttpClient, RequestTags {

// pads the fail-safe timeout to ensure we don't inadvertently timeout a request
private static final long ADDITIONAL_REQEUST_TIMEOUT = TimeUnit.SECONDS.toMillis(5);
private static final long MAX_ADDITIONAL_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(5);

private static final Logger LOG = LoggerFactory.getLogger(StandardHttpClient.class);

Expand Down Expand Up @@ -81,13 +80,12 @@ public <V> CompletableFuture<HttpResponse<V>> sendAsync(HttpRequest request, Cla

@Override
public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest request, Consumer<List<ByteBuffer>> consumer) {
CompletableFuture<HttpResponse<AsyncBody>> result = new CompletableFuture<>();
StandardHttpRequest standardHttpRequest = (StandardHttpRequest) request;

retryWithExponentialBackoff(result, () -> consumeBytesOnce(standardHttpRequest, consumer), request.uri(),
HttpResponse::code,
r -> r.body().cancel(), standardHttpRequest.getTimeout());
return result;
final StandardHttpRequest standardHttpRequest = (StandardHttpRequest) request;
return retryWithExponentialBackoff(
standardHttpRequest,
() -> consumeBytesOnce(standardHttpRequest, consumer),
r -> r.body().cancel(),
HttpResponse::code);
}

private CompletableFuture<HttpResponse<AsyncBody>> consumeBytesOnce(StandardHttpRequest standardHttpRequest,
Expand Down Expand Up @@ -143,69 +141,48 @@ private CompletableFuture<HttpResponse<AsyncBody>> consumeBytesOnce(StandardHttp
};
}

public <V> CompletableFuture<V> orTimeout(CompletableFuture<V> future, Duration timeout) {
if (timeout != null && !timeout.isNegative() && !timeout.isZero()) {
long millis = timeout.toMillis();
millis += (Math.min(millis, ADDITIONAL_REQEUST_TIMEOUT));
Future<?> scheduled = Utils.schedule(Runnable::run, () -> future.completeExceptionally(new TimeoutException()),
millis, TimeUnit.MILLISECONDS);
future.whenComplete((v, t) -> scheduled.cancel(true));
}
return future;
}

/**
* Will retry the action if needed based upon the retry settings provided by the ExponentialBackoffIntervalCalculator.
*/
protected <V> void retryWithExponentialBackoff(CompletableFuture<V> result,
Supplier<CompletableFuture<V>> action, URI uri, Function<V, Integer> codeExtractor,
java.util.function.Consumer<V> cancel, ExponentialBackoffIntervalCalculator retryIntervalCalculator,
Duration timeout) {

orTimeout(action.get(), timeout)
.whenComplete((response, throwable) -> {
if (retryIntervalCalculator.shouldRetry() && !result.isDone()) {
long retryInterval = retryIntervalCalculator.nextReconnectInterval();
boolean retry = false;
if (response != null) {
Integer code = codeExtractor.apply(response);
if (code != null && code >= 500) {
LOG.debug("HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis",
uri, code, retryInterval);
retry = true;
cancel.accept(response);
}
} else {
if (throwable instanceof CompletionException) {
throwable = ((CompletionException) throwable).getCause();
}
if (throwable instanceof IOException) {
LOG.debug(String.format("HTTP operation on url: %s should be retried after %d millis because of IOException",
uri, retryInterval), throwable);
retry = true;
}
private <V> CompletableFuture<V> retryWithExponentialBackoff(
StandardHttpRequest request, Supplier<CompletableFuture<V>> action, java.util.function.Consumer<V> onCancel,
ToIntFunction<V> codeExtractor) {
final URI uri = request.uri();
final RequestConfig requestConfig = getTag(RequestConfig.class);
final ExponentialBackoffIntervalCalculator retryIntervalCalculator = ExponentialBackoffIntervalCalculator
.from(requestConfig);
final Duration timeout;
if (request.getTimeout() != null && !request.getTimeout().isNegative() && !request.getTimeout().isZero()) {
timeout = request.getTimeout().plusMillis(Math.min(request.getTimeout().toMillis(), MAX_ADDITIONAL_REQUEST_TIMEOUT));
} else {
timeout = null;
}
return AsyncUtils.retryWithExponentialBackoff(action, onCancel, timeout, retryIntervalCalculator,
(response, throwable, retryInterval) -> {
if (response != null) {
final int code = codeExtractor.applyAsInt(response);
if (code >= 500) {
LOG.debug(
"HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis",
uri, code, retryInterval);
return true;
}
} else {
if (throwable instanceof CompletionException) {
throwable = throwable.getCause();
}
if (retry) {
Utils.schedule(Runnable::run,
() -> retryWithExponentialBackoff(result, action, uri, codeExtractor, cancel, retryIntervalCalculator,
timeout),
retryInterval,
TimeUnit.MILLISECONDS);
return;
if (throwable instanceof IOException) {
LOG.debug(
String.format("HTTP operation on url: %s should be retried after %d millis because of IOException",
uri, retryInterval),
throwable);
return true;
}
}
completeOrCancel(cancel, result).accept(response, throwable);
return false;
});
}

protected <V> void retryWithExponentialBackoff(CompletableFuture<V> result,
Supplier<CompletableFuture<V>> action, URI uri, Function<V, Integer> codeExtractor,
java.util.function.Consumer<V> cancel, Duration timeout) {
RequestConfig requestConfig = getTag(RequestConfig.class);
retryWithExponentialBackoff(result, action, uri, codeExtractor, cancel,
ExponentialBackoffIntervalCalculator.from(requestConfig), timeout);
}

@Override
public io.fabric8.kubernetes.client.http.WebSocket.Builder newWebSocketBuilder() {
return new StandardWebSocketBuilder(this);
Expand All @@ -220,13 +197,11 @@ public HttpRequest.Builder newHttpRequestBuilder() {
final CompletableFuture<WebSocket> buildWebSocket(StandardWebSocketBuilder standardWebSocketBuilder,
Listener listener) {

CompletableFuture<WebSocketResponse> intermediate = new CompletableFuture<>();
StandardHttpRequest request = standardWebSocketBuilder.asHttpRequest();

retryWithExponentialBackoff(intermediate, () -> buildWebSocketOnce(standardWebSocketBuilder, listener),
request.uri(),
r -> Optional.of(r.webSocketUpgradeResponse).map(HttpResponse::code).orElse(null),
r -> Optional.ofNullable(r.webSocket).ifPresent(w -> w.sendClose(1000, null)), request.getTimeout());
final CompletableFuture<WebSocketResponse> intermediate = retryWithExponentialBackoff(
standardWebSocketBuilder.asHttpRequest(),
() -> buildWebSocketOnce(standardWebSocketBuilder, listener),
r -> Optional.ofNullable(r.webSocket).ifPresent(w -> w.sendClose(1000, null)),
r -> Optional.of(r.webSocketUpgradeResponse).map(HttpResponse::code).orElse(null));

CompletableFuture<WebSocket> result = new CompletableFuture<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.utils;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class AsyncUtils {

private AsyncUtils() {
}

/**
* Returns the provided {@link CompletableFuture} that will complete exceptionally with a {@link TimeoutException}
* if the provided {@link Duration} timeout period is exceeded.
*
* @param future the future to add a timeout to.
* @param timeout the timeout duration.
* @return the provided future with a timeout.
* @param <T> the result type returned by the future.
*/
public static <T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, Duration timeout) {
if (timeout != null && timeout.toMillis() > 0) {
final Future<?> scheduled = Utils.schedule(Runnable::run, () -> future.completeExceptionally(new TimeoutException()),
timeout.toMillis(), TimeUnit.MILLISECONDS);
future.whenComplete((v, t) -> scheduled.cancel(true));
}
return future;
}

/**
* Returns a new {@link CompletableFuture} that will complete once the action provided by the action supplier completes.
* The action will be retried with an exponential backoff using the {@link ExponentialBackoffIntervalCalculator} as
* long as the {@link ShouldRetry} predicate returns true.
* Each action retrieval retry will time out after the provided timeout {@link Duration}.
*
* @param action the action supplier.
* @param onCancel consumes the intermediate result in case the returned future is cancelled or each time the action is
* retried.
* @param timeout the timeout duration.
* @param retryIntervalCalculator the retry interval calculator.
* @param shouldRetry the predicate to compute if the action is to be retried.
* @return a new {@link CompletableFuture} that will complete once the action provided by the action supplier completes.
* @param <T> the result type returned by the future.
*/
public static <T> CompletableFuture<T> retryWithExponentialBackoff(Supplier<CompletableFuture<T>> action,
Consumer<T> onCancel, Duration timeout, ExponentialBackoffIntervalCalculator retryIntervalCalculator,
ShouldRetry<T> shouldRetry) {
final CompletableFuture<T> result = new CompletableFuture<>();
retryWithExponentialBackoff(result, action, onCancel, timeout, retryIntervalCalculator, shouldRetry);
return result;
}

private static <T> void retryWithExponentialBackoff(CompletableFuture<T> result, Supplier<CompletableFuture<T>> action,
Consumer<T> onCancel, Duration timeout, ExponentialBackoffIntervalCalculator retryIntervalCalculator,
ShouldRetry<T> shouldRetry) {
withTimeout(action.get(), timeout).whenComplete((r, t) -> {
if (retryIntervalCalculator.shouldRetry() && !result.isDone()) {
final long retryInterval = retryIntervalCalculator.nextReconnectInterval();
if (shouldRetry.shouldRetry(r, t, retryInterval)) {
if (r != null) {
onCancel.accept(r);
}
Utils.schedule(Runnable::run,
() -> retryWithExponentialBackoff(result, action, onCancel, timeout, retryIntervalCalculator, shouldRetry),
retryInterval, TimeUnit.MILLISECONDS);
return;
}
}
if (t != null) {
result.completeExceptionally(t);
} else if (!result.complete(r)) {
onCancel.accept(r);
}
});
}

@FunctionalInterface
public interface ShouldRetry<T> {
boolean shouldRetry(T result, Throwable exception, long retryInterval);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -221,6 +222,8 @@ void testRequestTimeout() {
});

Awaitility.await().atMost(10, TimeUnit.SECONDS).until(consumeFuture::isDone);
assertThatThrownBy(consumeFuture::get)
.isInstanceOf(ExecutionException.class).hasCauseInstanceOf(TimeoutException.class);
}

}
Loading

0 comments on commit c984cff

Please sign in to comment.