Skip to content

Commit

Permalink
Merge pull request Netflix#1300 from mattrjacobs/scalar-command-termi…
Browse files Browse the repository at this point in the history
…nates-in-on-next

Scalar command terminates in on next
  • Loading branch information
mattrjacobs authored Aug 3, 2016
2 parents da1198c + 5b9bdae commit c873b7b
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 31 deletions.
62 changes: 38 additions & 24 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,9 @@ public Observable<R> toObservable() {
@Override
public void call() {
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
handleCommandEnd(_cmd, false); //user code never ran
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
handleCommandEnd(_cmd, true); //user code did run
handleCommandEnd(true); //user code did run
}
}
};
Expand All @@ -382,15 +382,19 @@ public void call() {
@Override
public void call() {
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
handleCommandEnd(_cmd, false); //user code never ran
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
handleCommandEnd(_cmd, true); //user code did run
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
handleCommandEnd(true); //user code did run
}
}
};
Expand Down Expand Up @@ -468,7 +472,6 @@ public Observable<R> call() {
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);


Observable<R> afterCache;

// put in cache
Expand Down Expand Up @@ -541,6 +544,8 @@ public void call(Throwable t) {
}
}

abstract protected boolean commandIsScalar();

/**
* This decorates "Hystrix" functionality around the run() Observable.
*
Expand All @@ -556,17 +561,26 @@ public void call(R r) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
}
};

final Action0 markCompleted = new Action0() {
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
}
};

Expand Down Expand Up @@ -611,7 +625,7 @@ public void call(Notification<? super R> rNotification) {
}

return execution.doOnNext(markEmits)
.doOnCompleted(markCompleted)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
Expand Down Expand Up @@ -877,25 +891,25 @@ private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandRes
@Override
public void call() {
if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
cleanUpAfterResponseFromCache(_cmd, false); //user code never ran
cleanUpAfterResponseFromCache(false); //user code never ran
} else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
cleanUpAfterResponseFromCache(_cmd, true); //user code did run
cleanUpAfterResponseFromCache(true); //user code did run
}
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
cleanUpAfterResponseFromCache(_cmd, false); //user code never ran
cleanUpAfterResponseFromCache(false); //user code never ran
} else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
cleanUpAfterResponseFromCache(_cmd, true); //user code did run
cleanUpAfterResponseFromCache(true); //user code did run
}
}
});
}

private void cleanUpAfterResponseFromCache(final AbstractCommand<R> _cmd, boolean commandExecutionStarted) {
private void cleanUpAfterResponseFromCache(boolean commandExecutionStarted) {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
Expand All @@ -912,7 +926,7 @@ private void cleanUpAfterResponseFromCache(final AbstractCommand<R> _cmd, boolea
eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey);
}

private void handleCommandEnd(final AbstractCommand _cmd, boolean commandExecutionStarted) {
private void handleCommandEnd(boolean commandExecutionStarted) {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,16 @@ public class ExecutionResult {
private static final HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();
private static final int NUM_EVENT_TYPES = ALL_EVENT_TYPES.length;
private static final BitSet EXCEPTION_PRODUCING_EVENTS = new BitSet(NUM_EVENT_TYPES);
private static final BitSet TERMINAL_EVENTS = new BitSet(NUM_EVENT_TYPES);

static {
for (HystrixEventType eventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) {
EXCEPTION_PRODUCING_EVENTS.set(eventType.ordinal());
}

for (HystrixEventType eventType: HystrixEventType.TERMINAL_EVENT_TYPES) {
TERMINAL_EVENTS.set(eventType.ordinal());
}
}

public static class EventCounts {
Expand Down Expand Up @@ -353,6 +358,10 @@ public boolean executionOccurred() {
return executionOccurred;
}

public boolean containsTerminalEvent() {
return eventCounts.containsAnyOf(TERMINAL_EVENTS);
}

@Override
public String toString() {
return "ExecutionResult{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,4 +404,8 @@ protected String getFallbackMethodName() {
return "getFallback";
}

@Override
protected boolean commandIsScalar() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,24 @@ public Observable<R> toObservableWithStateCopiedInto(final AbstractCommand<R> co
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (!completionLogicRun.get()) {
if (completionLogicRun.compareAndSet(false, true)) {
commandCompleted(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
if (!completionLogicRun.get()) {
if (completionLogicRun.compareAndSet(false, true)) {
commandCompleted(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (!completionLogicRun.get()) {
if (completionLogicRun.compareAndSet(false, true)) {
commandUnsubscribed(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,22 @@ public static HystrixEventType from(HystrixRollingNumberEvent event) {
*/
public final static List<HystrixEventType> EXCEPTION_PRODUCING_EVENT_TYPES = new ArrayList<HystrixEventType>();

/**
* List of events that are terminal
*/
public final static List<HystrixEventType> TERMINAL_EVENT_TYPES = new ArrayList<HystrixEventType>();

static {
EXCEPTION_PRODUCING_EVENT_TYPES.add(BAD_REQUEST);
EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_FAILURE);
EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_MISSING);
EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_REJECTION);

for (HystrixEventType eventType: HystrixEventType.values()) {
if (eventType.isTerminal()) {
TERMINAL_EVENT_TYPES.add(eventType);
}
}
}

public enum ThreadPool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ protected String getFallbackMethodName() {
return "resumeWithFallback";
}

@Override
protected boolean commandIsScalar() {
return false;
}

/**
* Construct a {@link HystrixObservableCommand} with defined {@link Setter} that allows injecting property and strategy overrides and other optional arguments.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public void testSingleTestOnOpenCircuitAfterTimeWindow() {
public void testCircuitClosedAfterSuccess() {
String key = "cmd-G";
try {
int sleepWindow = 20;
int sleepWindow = 100;
HystrixCommand<Boolean> cmd1 = new FailureCommand(key, 1, sleepWindow);
HystrixCircuitBreaker cb = cmd1.circuitBreaker;

Expand Down Expand Up @@ -397,6 +397,8 @@ public void testCircuitClosedAfterSuccess() {
asyncResult.toBlocking().single();

// all requests should be open again

Thread.sleep(100);
System.out.println("CircuitBreaker state 2 : " + cmd1.getMetrics().getHealthCounts());
assertTrue(cb.allowRequest());
assertTrue(cb.allowRequest());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -3686,6 +3688,99 @@ public void onNext(Boolean b) {
}
}

/**
* Some RxJava operators like take(n), zip receive data in an onNext from upstream and immediately unsubscribe.
* When upstream is a HystrixCommand, Hystrix may get that unsubscribe before it gets to its onCompleted.
* This should still be marked as a HystrixEventType.SUCCESS.
*/
@Test
public void testUnsubscribingDownstreamOperatorStillResultsInSuccessEventType() throws InterruptedException {
HystrixCommand<Integer> cmd = getCommand(ExecutionIsolationStrategy.THREAD, AbstractTestHystrixCommand.ExecutionResult.SUCCESS, 100, AbstractTestHystrixCommand.FallbackResult.UNIMPLEMENTED);

Observable<Integer> o = cmd.toObservable()
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis() + " CMD OnNext : " + i);
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis() + " CMD OnError : " + throwable);
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis() + " CMD OnCompleted");
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis() + " CMD OnSubscribe");
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis() + " CMD OnUnsubscribe");
}
})
.take(1)
.observeOn(Schedulers.io())
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer i) {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis() + " : Doing some more computation in the onNext!!");
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
return i;
}
});

final CountDownLatch latch = new CountDownLatch(1);

o.doOnSubscribe(new Action0() {
@Override
public void call() {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis() + " : OnSubscribe");
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis() + " : OnUnsubscribe");
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis() + " : OnCompleted");
latch.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis() + " : OnError : " + e);
latch.countDown();
}

@Override
public void onNext(Integer i) {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis() + " : OnNext : " + i);
}
});

latch.await(1000, TimeUnit.MILLISECONDS);

System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
assertTrue(cmd.isExecutedInThread());
assertCommandExecutionEvents(cmd, HystrixEventType.SUCCESS);
}

/**
*********************** THREAD-ISOLATED Execution Hook Tests **************************************
*/
Expand Down

0 comments on commit c873b7b

Please sign in to comment.