Skip to content

Commit

Permalink
Add onUnsubscribe hook to command execution and wire in to command …
Browse files Browse the repository at this point in the history
…flow
  • Loading branch information
mattrjacobs committed Dec 15, 2016
1 parent 55a5470 commit 9378b96
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,23 @@ public void call() {
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
try {
executionHook.onUnsubscribe(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
}
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
try {
executionHook.onUnsubscribe(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
}
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
Expand Down Expand Up @@ -2169,6 +2179,11 @@ public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
actual.onCacheHit(commandInstance);
}

@Override
public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
actual.onUnsubscribe(commandInstance);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private <T> HystrixCommand<T> getHystrixCommandFromAbstractIfApplicable(HystrixInvokable<T> commandInstance) {
if (commandInstance instanceof HystrixCommand) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,17 @@ public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
//do nothing by default
}

/**
* Invoked with the command is unsubscribed before a terminal state
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.5.9
*/
public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
//do nothing by default
}

/**
* DEPRECATED: Change usages of this to {@link #onExecutionStart}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,14 @@ public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
recordHookCall(executionSequence, "onCacheHit");
}

@Override
public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
super.onUnsubscribe(commandInstance);
recordHookCall(executionSequence, "onUnsubscribe");
}

/**
* DEPRECATED METHODS FOLLOW. The string representation starts with `!D!` to distinguish
* DEPRECATED METHODS FOLLOW. The string representation starts with `!` to distinguish
*/

AtomicInteger startExecute = new AtomicInteger();
Expand Down

0 comments on commit 9378b96

Please sign in to comment.