Skip to content

Commit

Permalink
Merge pull request #511 from mattrjacobs/missing-short-circuit-on-com…
Browse files Browse the repository at this point in the history
…plete-hook

Matching hook orders from 1.3.x
  • Loading branch information
mattrjacobs committed Jan 14, 2015
2 parents aac5e8c + 07ae6d2 commit ef4794d
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 19 deletions.
57 changes: 42 additions & 15 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ protected static enum TimedOutStatus {
protected final AtomicReference<TimedOutStatus> isCommandTimedOut = new AtomicReference<TimedOutStatus>(TimedOutStatus.NOT_EXECUTED);
protected final AtomicBoolean isExecutionComplete = new AtomicBoolean(false);
protected final AtomicBoolean isExecutedInThread = new AtomicBoolean(false);
protected final AtomicReference<Action0> endCurrentThreadExecutingCommand = new AtomicReference<Action0>(); // don't like how this is being done


/**
* Instance of RequestCache logic
Expand Down Expand Up @@ -409,7 +411,17 @@ public void call() {
metrics.markShortCircuited();
// short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
try {
getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited").unsafeSubscribe(observer);
getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited")
.map(new Func1<R, R>() {

@Override
public R call(R t1) {
// allow transforming the results via the executionHook if the fallback succeeds
return executionHook.onComplete(_this, t1);
}

})
.unsafeSubscribe(observer);
} catch (Exception e) {
observer.onError(e);
}
Expand Down Expand Up @@ -482,7 +494,6 @@ private Observable<R> getRunObservableDecoratedForMetricsAndErrorHandling(final
metrics.incrementConcurrentExecutionCount();

final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final AtomicReference<Action0> endCurrentThreadExecutingCommand = new AtomicReference<Action0>(); // don't like how this is being done

Observable<R> run = null;
if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
Expand All @@ -505,7 +516,12 @@ public void call(Subscriber<? super R> s) {
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
isExecutedInThread.set(true);
getExecutionObservable().unsafeSubscribe(s);
getExecutionObservable().map(new Func1<R, R>() {
@Override
public R call(R r) {
return executionHook.onRunSuccess(_self, r);
}
}).unsafeSubscribe(s);
} catch (Throwable t) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
Expand All @@ -521,7 +537,12 @@ public void call(Subscriber<? super R> s) {
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
try {
run = getExecutionObservable();
run = getExecutionObservable().map(new Func1<R, R>() {
@Override
public R call(R r) {
return executionHook.onRunSuccess(_self, r);
}
});
} catch (Throwable t) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
Expand All @@ -547,8 +568,7 @@ public R call(R t1) {
executionResult = executionResult.addEvents(HystrixEventType.SUCCESS);
once = true;
}

return executionHook.onRunSuccess(_self, t1);
return t1;
}

}).doOnCompleted(new Action0() {
Expand Down Expand Up @@ -577,7 +597,7 @@ public Observable<R> call(Throwable t) {
} else if (t instanceof HystrixObservableTimeoutOperator.HystrixTimeoutException) {
/**
* Timeout handling
*
*
* Callback is performed on the HystrixTimer thread.
*/
return getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
Expand Down Expand Up @@ -624,7 +644,7 @@ public Observable<R> call(Throwable t) {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException){
if (e instanceof HystrixBadRequestException) {
return Observable.error(e);
}

Expand All @@ -647,13 +667,10 @@ public void call(Notification<? super R> n) {
}).doOnTerminate(new Action0() {
@Override
public void call() {
// pop the command that is being run
if (endCurrentThreadExecutingCommand.get() != null) {
endCurrentThreadExecutingCommand.get().call();
}
if (isExecutedInThread.get()) {
threadPool.markThreadCompletion();
executionHook.onThreadComplete(_self);
//if the command timed out, then we've reached this point in the calling thread
//but the Hystrix thread is still doing work. Let it handle these markers.
if (!isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd();
}
}
}).map(new Func1<R, R>() {
Expand Down Expand Up @@ -854,6 +871,16 @@ public void call(Notification<? super R> n) {
}
}

protected void handleThreadEnd() {
if (endCurrentThreadExecutingCommand.get() != null) {
endCurrentThreadExecutingCommand.get().call();
}
if (isExecutedInThread.get()) {
threadPool.markThreadCompletion();
executionHook.onThreadComplete(this);
}
}

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

final AbstractCommand<R> originalCommand;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private static void _reset() {
HystrixCircuitBreaker.Factory.reset();
HystrixPlugins.reset();
HystrixPropertiesFactory.reset();
currentCommand.set(new LinkedList<HystrixCommandKey>());
}

private static ThreadLocal<LinkedList<HystrixCommandKey>> currentCommand = new ThreadLocal<LinkedList<HystrixCommandKey>>() {
Expand Down
11 changes: 11 additions & 0 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
Expand Down Expand Up @@ -304,6 +306,15 @@ public void call(Subscriber<? super R> s) {
}
}

}).doOnTerminate(new Action0() {
@Override
public void call() {
//If the command timed out, then the calling thread has already walked away so we need
//to handle these markers. Otherwise, the calling thread will perform these for us.
if (isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd();
}
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.netflix.hystrix;

import rx.Observable;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3899,7 +3899,7 @@ public void testExecutionHookTimeoutWithoutFallback() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackError - onError - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackError - onError - onRunSuccess - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());

}

Expand Down Expand Up @@ -3948,7 +3948,7 @@ public void testExecutionHookTimeoutWithFallback() {
assertEquals(1, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackSuccess - onComplete - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());
assertEquals("onStart - onThreadStart - onRunStart - onFallbackStart - onFallbackSuccess - onComplete - onRunSuccess - onThreadComplete - ", command.builder.executionHook.executionSequence.toString());
}

/**
Expand Down Expand Up @@ -4007,11 +4007,58 @@ public void testExecutionHookRejectedWithFallback() {

}

/**
* Execution hook on short-circuited with a fallback
*/
@Test
public void testExecutionHookShortCircuitedWithFallback() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker().setForceShortCircuit(true);
KnownFailureTestCommandWithFallback command = new KnownFailureTestCommandWithFallback(circuitBreaker);

try {
// now execute one that will be short-circuited
command.queue().get();
} catch (Exception e) {
throw new RuntimeException("not expecting", e);
}

assertTrue(command.isResponseShortCircuited());

// the run() method should not run as we're short-circuited
assertEquals(0, command.builder.executionHook.startRun.get());
// we should not have a response because of short-circuit
assertNull(command.builder.executionHook.runSuccessResponse);
// we should not have an exception because we didn't run
assertNull(command.builder.executionHook.runFailureException);

// the fallback() method should be run due to short-circuit
assertEquals(1, command.builder.executionHook.startFallback.get());
// response since we have a fallback
assertNotNull(command.builder.executionHook.fallbackSuccessResponse);
// null since fallback succeeds
assertNull(command.builder.executionHook.fallbackFailureException);

// execution occurred
assertEquals(1, command.builder.executionHook.startExecute.get());
// we should have a response because of fallback
assertNotNull(command.builder.executionHook.endExecuteSuccessResponse);
// we should not have an exception because of fallback
assertNull(command.builder.executionHook.endExecuteFailureException);

// thread execution
assertEquals(0, command.builder.executionHook.threadStart.get());
assertEquals(0, command.builder.executionHook.threadComplete.get());

// expected hook execution sequence
assertEquals("onStart - onFallbackStart - onFallbackSuccess - onComplete - ", command.builder.executionHook.executionSequence.toString());

}

/**
* Execution hook on short-circuit with a fallback
*/
@Test
public void testExecutionHookShortCircuitedWithFallbackViaQueue() {
public void testExecutionHookShortCircuitedWithoutFallbackViaQueue() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker().setForceShortCircuit(true);
KnownFailureTestCommandWithoutFallback command = new KnownFailureTestCommandWithoutFallback(circuitBreaker);
try {
Expand Down Expand Up @@ -4059,7 +4106,7 @@ public void testExecutionHookShortCircuitedWithFallbackViaQueue() {
* Execution hook on short-circuit with a fallback
*/
@Test
public void testExecutionHookShortCircuitedWithFallbackViaExecute() {
public void testExecutionHookShortCircuitedWithoutFallbackViaExecute() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker().setForceShortCircuit(true);
KnownFailureTestCommandWithoutFallback command = new KnownFailureTestCommandWithoutFallback(circuitBreaker);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import org.junit.Before;
import org.junit.Test;

import com.netflix.hystrix.HystrixCommand.Setter;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;

public class HystrixTest {
@Before
public void reset() {
Hystrix.reset();
}

@Test
public void testNotInThread() {
assertNull(Hystrix.getCurrentThreadExecutingCommand());
Expand Down

0 comments on commit ef4794d

Please sign in to comment.