You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have use for a Retryer that retries methods that returns futures. If there is any interest I would be happy to work on contributing it back to this project.
package com.github.rholder.retry;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.WaitStrategies;
import com.github.rholder.retry.WaitStrategy;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.StopStrategy;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Retries calls that returns a {@link ListenableFuture} by itself returning a {@link ListenableFuture}.
*/
public class FutureRetryer<I, O> {
private static final Logger logger = LoggerFactory.getLogger(FutureRetryer.class);
private final Function<I, ListenableFuture<O>> wrappedCall;
private final I input;
private final WaitStrategy waitStrategy;
private final StopStrategy stopStrategy;
private final Predicate<Attempt<Object>> retryableExceptionPredicate;
private final ScheduledExecutorService scheduledExecutorService;
private final SettableFuture<O> responseFuture;
private final AtomicInteger attemptCount = new AtomicInteger(0);
private final long startTime;
public FutureRetryer(Function<I, ListenableFuture<O>> wrappedCall, I input, WaitStrategy waitStrategy,
StopStrategy stopStrategy, Predicate<Attempt<Object>> rejectionPredicate,
ScheduledExecutorService scheduledExecutorService) {
this.wrappedCall = wrappedCall;
this.input = input;
this.waitStrategy = waitStrategy;
this.stopStrategy = stopStrategy;
this.retryableExceptionPredicate = rejectionPredicate;
this.scheduledExecutorService = scheduledExecutorService;
this.responseFuture = SettableFuture.create();
this.startTime = System.nanoTime();
}
public ListenableFuture<O> performAction() {
performActionImpl();
return this.responseFuture;
}
public int getAttemptCount() {
return this.attemptCount.get();
}
private void performActionImpl() {
ListenableFuture<O> callResponseFuture = wrappedCall.apply(this.input);
Futures.addCallback(callResponseFuture, new FutureCallback<O>() {
@Override
public void onSuccess(final O result) {
handleSuccessfulResponse(result);
}
@Override
public void onFailure(final Throwable throwable) {
handleFailureResponse(throwable);
}
});
}
/**
* Handles successful response by writing to the responseFuture.
*/
private void handleSuccessfulResponse(O result) {
this.responseFuture.set(result);
}
/**
* Handles failure response by retrying if the failure is retryable and the all attempts have not been
* exhausted. In case a retry is not possible the last exception is set on the responseFuture.
*/
private void handleFailureResponse(Throwable throwable) {
int currentAttemptCount = this.attemptCount.get();
ExceptionAttempt attempt = new ExceptionAttempt(throwable, currentAttemptCount,
System.nanoTime() - this.startTime);
// If the retryable exception predicate does not allow the last attempt then set the exception on the
// response future and end all further retries.
if (!this.retryableExceptionPredicate.apply((Attempt<Object>) attempt)) {
this.responseFuture.setException(throwable);
return;
}
// check if the no of retries is exhausted
if (stopStrategy.shouldStop(attempt)) {
this.responseFuture.setException(throwable);
return;
}
// increment after it is known that a retry should happen
this.attemptCount.incrementAndGet();
// schedule retry based after some delay
long delayTime = this.waitStrategy.computeSleepTime(attempt);
// schedule for delayed execution.
this.scheduledExecutorService.schedule(() -> this.performActionImpl(), delayTime, TimeUnit.MILLISECONDS);
}
public static class Builder<IB, OB> {
private static final int DEFAULT_STOP_ATTEMPT = 10;
private static final int DEFAULT_WAIT_MULTIPLIER = 300;
private static final int DEFAULT_WAIT_MAX = 60;
private static final TimeUnit DEFAULT_WAIT_MAX_UNIT = TimeUnit.SECONDS;
private Function<IB, ListenableFuture<OB>> wrappedCall;
private IB input;
private WaitStrategy waitStrategy;
private StopStrategy stopStrategy;
private Predicate<Attempt<Object>> rejectionPredicate = Predicates.alwaysFalse();
private ScheduledExecutorService scheduledExecutorService;
public Builder<IB, OB> setWrappedCall(Function<IB, ListenableFuture<OB>> wrappedCall) {
this.wrappedCall = wrappedCall;
return this;
}
public Builder<IB, OB> setInput(IB input) {
this.input = input;
return this;
}
public Builder<IB, OB> setWaitStrategy(WaitStrategy waitStrategy) {
this.waitStrategy = waitStrategy;
return this;
}
public Builder<IB, OB> setStopStrategy(StopStrategy stopStrategy) {
this.stopStrategy = stopStrategy;
return this;
}
public Builder<IB, OB> retryIfExceptionOfType(Class<? extends Throwable> exceptionClass) {
this.rejectionPredicate = (Predicate<Attempt<Object>>) Predicates.or(this.rejectionPredicate,
new ExceptionClassPredicate(exceptionClass));
return this;
}
public Builder<IB, OB> setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = scheduledExecutorService;
return this;
}
public FutureRetryer<IB, OB> build() {
Preconditions.checkNotNull(this.wrappedCall, "Wrapped call is required.");
Preconditions.checkNotNull(this.scheduledExecutorService, "Scheduled executor service is required.");
if (this.stopStrategy == null) {
logger.warn("No StopStrategy provided, using default strategy.");
this.stopStrategy = StopStrategies.stopAfterAttempt(DEFAULT_STOP_ATTEMPT);
}
if (this.waitStrategy == null) {
logger.warn("No WaitStrategy provided, using default strategy.");
this.waitStrategy = WaitStrategies.exponentialWait(DEFAULT_WAIT_MULTIPLIER, DEFAULT_WAIT_MAX,
DEFAULT_WAIT_MAX_UNIT);
}
return new FutureRetryer<>(
this.wrappedCall,
this.input,
this.waitStrategy,
this.stopStrategy,
this.rejectionPredicate,
this.scheduledExecutorService
);
}
}
/**
* Attempt impl to be used with an Exception.
*/
static final class ExceptionAttempt implements Attempt<Object> {
private final ExecutionException e;
private final long attemptNumber;
private final long delaySinceFirstAttempt;
ExceptionAttempt(Throwable cause, long attemptNumber, long delaySinceFirstAttempt) {
this.e = new ExecutionException(cause);
this.attemptNumber = attemptNumber;
this.delaySinceFirstAttempt = delaySinceFirstAttempt;
}
@Override
public Object get() throws ExecutionException {
throw this.e;
}
@Override
public boolean hasResult() {
return false;
}
@Override
public boolean hasException() {
return true;
}
@Override
public Object getResult() throws IllegalStateException {
throw new IllegalStateException("The attempt resulted in an exception, not in a result");
}
@Override
public Throwable getExceptionCause() throws IllegalStateException {
return this.e.getCause();
}
@Override
public long getAttemptNumber() {
return attemptNumber;
}
@Override
public long getDelaySinceFirstAttempt() {
return delaySinceFirstAttempt;
}
}
static final class ExceptionClassPredicate implements Predicate<Attempt<Object>> {
private Class<? extends Throwable> exceptionClass;
ExceptionClassPredicate(Class<? extends Throwable> exceptionClass) {
this.exceptionClass = exceptionClass;
}
@Override
public boolean apply(Attempt<Object> attempt) {
if (!attempt.hasException()) {
return false;
}
return exceptionClass.isAssignableFrom(attempt.getExceptionCause().getClass());
}
}
}
I also have it in a gist https://gist.github.com/manasdk/ea816f45b26ff4a74f99e2c99116b0b5. If there is interest I can create a PR and send it out for review. I also wouldn't mind if someone took over the code and wrote it in a cleaner way that fits better with this project.
The text was updated successfully, but these errors were encountered:
I have use for a Retryer that retries methods that returns futures. If there is any interest I would be happy to work on contributing it back to this project.
I also have it in a gist https://gist.github.com/manasdk/ea816f45b26ff4a74f99e2c99116b0b5. If there is interest I can create a PR and send it out for review. I also wouldn't mind if someone took over the code and wrote it in a cleaner way that fits better with this project.
The text was updated successfully, but these errors were encountered: