Skip to content

Commit

Permalink
Merge pull request Netflix#1445 from mattrjacobs/early-unsubscribe-ho…
Browse files Browse the repository at this point in the history
…ok-test

Early unsubscribe hook test
  • Loading branch information
mattrjacobs authored Dec 15, 2016
2 parents 03d2836 + 9378b96 commit e08676e
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,23 @@ public void call() {
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
try {
executionHook.onUnsubscribe(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
}
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
try {
executionHook.onUnsubscribe(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
}
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
Expand Down Expand Up @@ -2169,6 +2179,11 @@ public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
actual.onCacheHit(commandInstance);
}

@Override
public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
actual.onUnsubscribe(commandInstance);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private <T> HystrixCommand<T> getHystrixCommandFromAbstractIfApplicable(HystrixInvokable<T> commandInstance) {
if (commandInstance instanceof HystrixCommand) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,17 @@ public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
//do nothing by default
}

/**
* Invoked with the command is unsubscribed before a terminal state
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.5.9
*/
public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
//do nothing by default
}

/**
* DEPRECATED: Change usages of this to {@link #onExecutionStart}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4010,6 +4010,55 @@ public void call(TestHystrixCommand<Integer> command) {
});
}

@Test
public void testExecutionHookEarlyUnsubscribe() {
System.out.println("Running command.observe(), awaiting terminal state of Observable, then running assertions...");
final CountDownLatch latch = new CountDownLatch(1);

TestHystrixCommand<Integer> command = getCommand(ExecutionIsolationStrategy.THREAD, AbstractTestHystrixCommand.ExecutionResult.SUCCESS, 1000);
Observable<Integer> o = command.observe();

Subscription s = o.
doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnUnsubscribe");
latch.countDown();
}
}).
subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnCompleted");
latch.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnError : " + e);
latch.countDown();
}

@Override
public void onNext(Integer i) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnNext : " + i);
}
});

try {
Thread.sleep(15);
s.unsubscribe();
latch.await(3, TimeUnit.SECONDS);
TestableExecutionHook hook = command.getBuilder().executionHook;
assertTrue(hook.commandEmissionsMatch(0, 0, 0));
assertTrue(hook.executionEventsMatch(0, 0, 0));
assertTrue(hook.fallbackEventsMatch(0, 0, 0));
assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onUnsubscribe - onThreadComplete - ", hook.executionSequence.toString());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Short-circuit? : NO
* Thread/semaphore: THREAD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,14 @@ public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
recordHookCall(executionSequence, "onCacheHit");
}

@Override
public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
super.onUnsubscribe(commandInstance);
recordHookCall(executionSequence, "onUnsubscribe");
}

/**
* DEPRECATED METHODS FOLLOW. The string representation starts with `!D!` to distinguish
* DEPRECATED METHODS FOLLOW. The string representation starts with `!` to distinguish
*/

AtomicInteger startExecute = new AtomicInteger();
Expand Down

0 comments on commit e08676e

Please sign in to comment.