Skip to content

Commit

Permalink
Merge pull request Netflix#1302 from mattrjacobs/evaluate-pr-1265
Browse files Browse the repository at this point in the history
Evaluate pr 1265
  • Loading branch information
mattrjacobs authored Aug 3, 2016
2 parents c873b7b + dc31c60 commit fc491c3
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 27 deletions.
105 changes: 81 additions & 24 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Action0;

import com.netflix.hystrix.exception.HystrixBadRequestException;
import com.netflix.hystrix.exception.HystrixRuntimeException;
Expand Down Expand Up @@ -257,7 +260,10 @@ public Setter andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter

}

/**
private final AtomicReference<Thread> executionThread = new AtomicReference<Thread>();
private final AtomicBoolean interruptOnFutureCancel = new AtomicBoolean(false);

/**
* Implement this method with code to be executed when {@link #execute()} or {@link #queue()} are invoked.
*
* @return R response type
Expand Down Expand Up @@ -295,6 +301,12 @@ public Observable<R> call() {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}

Expand Down Expand Up @@ -356,21 +368,64 @@ public R execute() {
*/
public Future<R> queue() {
/*
* --- Schedulers.immediate()
*
* We use the 'immediate' schedule since Future.get() is blocking so we don't want to bother doing the callback to the Future on a separate thread
* as we don't need to separate the Hystrix thread from user threads since they are already providing it via the Future.get() call.
*
* We pass 'false' to tell the Observable we will block on it so it doesn't schedule an async timeout.
*
* This optimizes for using the calling thread to do the timeout rather than scheduling another thread.
*
* In a tight-loop of executing commands this optimization saves a few microseconds per execution.
* It also just makes no sense to use a separate thread to timeout the command when the calling thread
* is going to sit waiting on it.
* The Future returned by Observable.toBlocking().toFuture() does not implement the
* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
* thus, to comply with the contract of Future, we must wrap around it.
*/
final Observable<R> o = toObservable();
final Future<R> f = o.toBlocking().toFuture();
final Future<R> delegate = toObservable().toBlocking().toFuture();

final Future<R> f = new Future<R>() {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) {
return false;
}

if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
/*
* The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
* (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
* issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked.
* The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
* than that interruption request cannot be taken back.
*/
interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}

final boolean res = delegate.cancel(interruptOnFutureCancel.get());

if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
final Thread t = executionThread.get();
if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt();
}
}

return res;
}

@Override
public boolean isCancelled() {
return delegate.isCancelled();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}

@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}

};

/* special handling of error states that throw immediately */
if (f.isDone()) {
Expand All @@ -383,13 +438,15 @@ public Future<R> queue() {
return f;
} else if (re instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) re;
if (hre.getFailureType() == FailureType.COMMAND_EXCEPTION || hre.getFailureType() == FailureType.TIMEOUT) {
// we don't throw these types from queue() only from queue().get() as they are execution errors
return f;
} else {
// these are errors we throw from queue() as they as rejection type errors
throw hre;
}
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
// we don't throw these types from queue() only from queue().get() as they are execution errors
return f;
default:
// these are errors we throw from queue() as they as rejection type errors
throw hre;
}
} else {
throw re;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public abstract class HystrixCommandProperties {
private static final Boolean default_executionTimeoutEnabled = true;
private static final ExecutionIsolationStrategy default_executionIsolationStrategy = ExecutionIsolationStrategy.THREAD;
private static final Boolean default_executionIsolationThreadInterruptOnTimeout = true;
private static final Boolean default_executionIsolationThreadInterruptOnFutureCancel = false;
private static final Boolean default_metricsRollingPercentileEnabled = true;
private static final Boolean default_requestCacheEnabled = true;
private static final Integer default_fallbackIsolationSemaphoreMaxConcurrentRequests = 10;
Expand Down Expand Up @@ -77,6 +78,7 @@ public abstract class HystrixCommandProperties {
private final HystrixProperty<Integer> fallbackIsolationSemaphoreMaxConcurrentRequests; // Number of permits for fallback semaphore
private final HystrixProperty<Boolean> fallbackEnabled; // Whether fallback should be attempted.
private final HystrixProperty<Boolean> executionIsolationThreadInterruptOnTimeout; // Whether an underlying Future/Thread (when runInSeparateThread == true) should be interrupted after a timeout
private final HystrixProperty<Boolean> executionIsolationThreadInterruptOnFutureCancel; // Whether canceling an underlying Future/Thread (when runInSeparateThread == true) should interrupt the execution thread
private final HystrixProperty<Integer> metricsRollingStatisticalWindowInMilliseconds; // milliseconds back that will be tracked
private final HystrixProperty<Integer> metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow
private final HystrixProperty<Boolean> metricsRollingPercentileEnabled; // Whether monitoring should be enabled (SLA and Tracers).
Expand Down Expand Up @@ -121,6 +123,7 @@ protected HystrixCommandProperties(HystrixCommandKey key, HystrixCommandProperti
this.executionTimeoutInMilliseconds = getProperty(propertyPrefix, key, "execution.isolation.thread.timeoutInMilliseconds", builder.getExecutionIsolationThreadTimeoutInMilliseconds(), default_executionTimeoutInMilliseconds);
this.executionTimeoutEnabled = getProperty(propertyPrefix, key, "execution.timeout.enabled", builder.getExecutionTimeoutEnabled(), default_executionTimeoutEnabled);
this.executionIsolationThreadInterruptOnTimeout = getProperty(propertyPrefix, key, "execution.isolation.thread.interruptOnTimeout", builder.getExecutionIsolationThreadInterruptOnTimeout(), default_executionIsolationThreadInterruptOnTimeout);
this.executionIsolationThreadInterruptOnFutureCancel = getProperty(propertyPrefix, key, "execution.isolation.thread.interruptOnFutureCancel", builder.getExecutionIsolationThreadInterruptOnFutureCancel(), default_executionIsolationThreadInterruptOnFutureCancel);
this.executionIsolationSemaphoreMaxConcurrentRequests = getProperty(propertyPrefix, key, "execution.isolation.semaphore.maxConcurrentRequests", builder.getExecutionIsolationSemaphoreMaxConcurrentRequests(), default_executionIsolationSemaphoreMaxConcurrentRequests);
this.fallbackIsolationSemaphoreMaxConcurrentRequests = getProperty(propertyPrefix, key, "fallback.isolation.semaphore.maxConcurrentRequests", builder.getFallbackIsolationSemaphoreMaxConcurrentRequests(), default_fallbackIsolationSemaphoreMaxConcurrentRequests);
this.fallbackEnabled = getProperty(propertyPrefix, key, "fallback.enabled", builder.getFallbackEnabled(), default_fallbackEnabled);
Expand Down Expand Up @@ -240,6 +243,17 @@ public HystrixProperty<Boolean> executionIsolationThreadInterruptOnTimeout() {
return executionIsolationThreadInterruptOnTimeout;
}

/**
* Whether the execution thread should be interrupted if the execution observable is unsubscribed or the future is cancelled via {@link Future#cancel(true)}).
* <p>
* Applicable only when {@link #executionIsolationStrategy()} == THREAD.
*
* @return {@code HystrixProperty<Boolean>}
*/
public HystrixProperty<Boolean> executionIsolationThreadInterruptOnFutureCancel() {
return executionIsolationThreadInterruptOnFutureCancel;
}

/**
* Allow a dynamic override of the {@link HystrixThreadPoolKey} that will dynamically change which {@link HystrixThreadPool} a {@link HystrixCommand} executes on.
* <p>
Expand Down Expand Up @@ -531,6 +545,7 @@ public static class Setter {
private Integer executionIsolationSemaphoreMaxConcurrentRequests = null;
private ExecutionIsolationStrategy executionIsolationStrategy = null;
private Boolean executionIsolationThreadInterruptOnTimeout = null;
private Boolean executionIsolationThreadInterruptOnFutureCancel = null;
private Integer executionTimeoutInMilliseconds = null;
private Boolean executionTimeoutEnabled = null;
private Integer fallbackIsolationSemaphoreMaxConcurrentRequests = null;
Expand Down Expand Up @@ -585,7 +600,11 @@ public Boolean getExecutionIsolationThreadInterruptOnTimeout() {
return executionIsolationThreadInterruptOnTimeout;
}

/**
public Boolean getExecutionIsolationThreadInterruptOnFutureCancel() {
return executionIsolationThreadInterruptOnFutureCancel;
}

/**
* @deprecated As of 1.4.0, use {@link #getExecutionTimeoutInMilliseconds()}
*/
@Deprecated
Expand Down Expand Up @@ -690,6 +709,11 @@ public Setter withExecutionIsolationThreadInterruptOnTimeout(boolean value) {
return this;
}

public Setter withExecutionIsolationThreadInterruptOnFutureCancel(boolean value) {
this.executionIsolationThreadInterruptOnFutureCancel = value;
return this;
}

/**
* @deprecated As of 1.4.0, replaced with {@link #withExecutionTimeoutInMilliseconds(int)}. Timeouts are no longer applied only to thread-isolated commands, so a thread-specific name is misleading
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class HystrixCommandPropertiesTest {
.withExecutionTimeoutEnabled(true)
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.THREAD) // we want thread execution by default in tests
.withExecutionIsolationThreadInterruptOnTimeout(true)
.withExecutionIsolationThreadInterruptOnFutureCancel(true)
.withCircuitBreakerForceOpen(false) // we don't want short-circuiting by default
.withCircuitBreakerErrorThresholdPercentage(40) // % of 'marks' that must be failed to trip the circuit
.withMetricsRollingStatisticalWindowInMilliseconds(5000)// milliseconds back that will be tracked
Expand Down Expand Up @@ -111,6 +112,11 @@ public HystrixProperty<Boolean> executionIsolationThreadInterruptOnTimeout() {
return HystrixProperty.Factory.asProperty(builder.getExecutionIsolationThreadInterruptOnTimeout());
}

@Override
public HystrixProperty<Boolean> executionIsolationThreadInterruptOnFutureCancel() {
return HystrixProperty.Factory.asProperty(builder.getExecutionIsolationThreadInterruptOnFutureCancel());
}

@Override
public HystrixProperty<String> executionIsolationThreadPoolKeyOverride() {
return HystrixProperty.Factory.nullProperty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -2759,6 +2760,66 @@ public void testDoNotInterruptToObservableOnTimeoutIfPropertySaysNotTo() throws
assertFalse(cmd.hasBeenInterrupted());
}

@Test
public void testCancelFutureWithInterruptionWhenPropertySaysNotTo() throws InterruptedException, ExecutionException {
// given
InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), true, false, 1000);

// when
Future<Boolean> f = cmd.queue();
Thread.sleep(500);
f.cancel(true);
Thread.sleep(500);

// then
try {
f.get();
fail("Should have thrown a CancellationException");
} catch (CancellationException e) {
assertFalse(cmd.hasBeenInterrupted());
}
}

@Test
public void testCancelFutureWithInterruption() throws InterruptedException, ExecutionException {
// given
InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), true, true, 1000);

// when
Future<Boolean> f = cmd.queue();
Thread.sleep(500);
f.cancel(true);
Thread.sleep(500);

// then
try {
f.get();
fail("Should have thrown a CancellationException");
} catch (CancellationException e) {
assertTrue(cmd.hasBeenInterrupted());
}
}

@Test
public void testCancelFutureWithoutInterruption() throws InterruptedException, ExecutionException, TimeoutException {
// given
InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), true, true, 1000);

// when
Future<Boolean> f = cmd.queue();
Thread.sleep(500);
f.cancel(false);
Thread.sleep(500);

// then
try {
f.get();
fail("Should have thrown a CancellationException");
} catch (CancellationException e) {
assertFalse(cmd.hasBeenInterrupted());
}
}

@Test
public void testChainedCommand() {
class SubCommand extends TestHystrixCommand<Integer> {
Expand Down Expand Up @@ -5340,12 +5401,17 @@ protected Boolean run() throws Exception {

private static class InterruptibleCommand extends TestHystrixCommand<Boolean> {

public InterruptibleCommand(TestCircuitBreaker circuitBreaker, boolean shouldInterrupt) {
public InterruptibleCommand(TestCircuitBreaker circuitBreaker, boolean shouldInterrupt, boolean shouldInterruptOnCancel, int timeoutInMillis) {
super(testPropsBuilder()
.setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics)
.setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter()
.withExecutionIsolationThreadInterruptOnFutureCancel(shouldInterruptOnCancel)
.withExecutionIsolationThreadInterruptOnTimeout(shouldInterrupt)
.withExecutionTimeoutInMilliseconds(100)));
.withExecutionTimeoutInMilliseconds(timeoutInMillis)));
}

public InterruptibleCommand(TestCircuitBreaker circuitBreaker, boolean shouldInterrupt) {
this(circuitBreaker, shouldInterrupt, false, 100);
}

private volatile boolean hasBeenInterrupted;
Expand Down

0 comments on commit fc491c3

Please sign in to comment.