Skip to content

Commit

Permalink
Merge branch 'master' of github.com:Netflix/Hystrix
Browse files Browse the repository at this point in the history
  • Loading branch information
robertroeser committed May 23, 2016
2 parents 285e4ea + 7d3afbf commit bb8f153
Show file tree
Hide file tree
Showing 16 changed files with 1,976 additions and 520 deletions.
231 changes: 108 additions & 123 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,96 +3,49 @@
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subjects.ReplaySubject;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class HystrixCachedObservable<R> {
final AbstractCommand<R> originalCommand;
final Observable<R> cachedObservable;
final Subscription originalSubscription;
final ReplaySubject<R> replaySubject = ReplaySubject.create();
final AtomicInteger outstandingSubscriptions = new AtomicInteger(0);
protected final Subscription originalSubscription;
protected final Observable<R> cachedObservable;
private volatile int outstandingSubscriptions = 0;

/* package-private */ HystrixCachedObservable(Observable<R> originalObservable, final AbstractCommand<R> originalCommand) {
protected HystrixCachedObservable(final Observable<R> originalObservable) {
ReplaySubject<R> replaySubject = ReplaySubject.create();
this.originalSubscription = originalObservable
.subscribe(replaySubject);

this.cachedObservable = replaySubject
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (outstandingSubscriptions.decrementAndGet() == 0) {
outstandingSubscriptions--;
if (outstandingSubscriptions == 0) {
originalSubscription.unsubscribe();
}
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
outstandingSubscriptions.getAndIncrement();
outstandingSubscriptions++;
}
});
this.originalCommand = originalCommand;
}

public static <R> HystrixCachedObservable<R> from(Observable<R> o, AbstractCommand<R> originalCommand) {
return new HystrixCachedObservable<R>(o, originalCommand);
}

public static <R> HystrixCachedObservable<R> from(Observable<R> o, HystrixCollapser<?, R, ?> originalCollapser) {
return new HystrixCachedObservable<R>(o, null); //???
return new HystrixCommandResponseFromCache<R>(o, originalCommand);
}

public static <R> HystrixCachedObservable<R> from(Observable<R> o, HystrixObservableCollapser<?, ?, R, ?> originalCollapser) {
return new HystrixCachedObservable<R>(o, null); //???
public static <R> HystrixCachedObservable<R> from(Observable<R> o) {
return new HystrixCachedObservable<R>(o);
}

public Observable<R> toObservable() {
return cachedObservable;
}

public Observable<R> toObservable(final AbstractCommand<R> commandToCopyStateInto) {
final AtomicBoolean completionLogicRun = new AtomicBoolean(false);

return cachedObservable
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (!completionLogicRun.get()) {
commandCompleted(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
if (!completionLogicRun.get()) {
commandCompleted(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (!completionLogicRun.get()) {
commandUnsubscribed(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
});
}

private void commandCompleted(final AbstractCommand<R> commandToCopyStateInto) {
commandToCopyStateInto.executionResult = originalCommand.executionResult;
}

private void commandUnsubscribed(final AbstractCommand<R> commandToCopyStateInto) {
commandToCopyStateInto.executionResult = commandToCopyStateInto.executionResult.addEvent(HystrixEventType.CANCELLED);
commandToCopyStateInto.executionResult = commandToCopyStateInto.executionResult.setExecutionLatency(-1);
public void unsubscribe() {
originalSubscription.unsubscribe();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;
Expand Down Expand Up @@ -331,12 +333,17 @@ protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentTy
* to transform the {@code <BatchReturnType>} into {@code <ResponseType>}
*/
public Observable<ResponseType> observe() {
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
// use a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<ResponseType> subject = ReplaySubject.create();
// eagerly kick off subscription
toObservable().subscribe(subject);
final Subscription underlyingSubscription = toObservable().subscribe(subject);
// return the subject that can be subscribed to later while the execution has already started
return subject;
return subject.doOnUnsubscribe(new Action0() {
@Override
public void call() {
underlyingSubscription.unsubscribe();
}
});
}

/**
Expand Down Expand Up @@ -373,20 +380,23 @@ public Observable<ResponseType> toObservable() {
public Observable<ResponseType> toObservable(Scheduler observeOn) {

final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get();
final String cacheKey = getCacheKey();

/* try from cache first */
if (isRequestCacheEnabled) {
HystrixCachedObservable<ResponseType> fromCache = requestCache.get(getCacheKey());
HystrixCachedObservable<ResponseType> fromCache = requestCache.get(cacheKey);
if (fromCache != null) {
metrics.markResponseFromCache();
return fromCache.toObservable();
}
}

final HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> _self = this;

RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
metrics.markRequestBatched();
if (isRequestCacheEnabled) {

if (isRequestCacheEnabled && cacheKey != null) {
/*
* A race can occur here with multiple threads queuing but only one will be cached.
* This means we can have some duplication of requests in a thread-race but we're okay
Expand All @@ -396,8 +406,8 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {
* If this is an issue we can make a lazy-future that gets set in the cache
* then only the winning 'put' will be invoked to actually call 'submitRequest'
*/
HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response, this);
HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), toCache);
HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response);
HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache == null) {
return toCache.toObservable();
} else {
Expand Down Expand Up @@ -449,8 +459,9 @@ public ResponseType execute() {
* within an <code>ExecutionException.getCause()</code> (thrown by {@link Future#get}) if an error occurs and a fallback cannot be retrieved
*/
public Future<ResponseType> queue() {
final Observable<ResponseType> o = toObservable();
return o.toBlocking().toFuture();
return toObservable()
.toBlocking()
.toFuture();
}

/**
Expand Down Expand Up @@ -501,7 +512,7 @@ public interface CollapsedRequest<ResponseType, RequestArgumentType> {
*
* @return RequestArgumentType
*/
public RequestArgumentType getArgument();
RequestArgumentType getArgument();

/**
* This corresponds in a OnNext(Response); OnCompleted pair of emissions. It represents a single-value usecase.
Expand All @@ -511,15 +522,15 @@ public interface CollapsedRequest<ResponseType, RequestArgumentType> {
* @param response
* ResponseType
*/
public void setResponse(ResponseType response);
void setResponse(ResponseType response);

/**
* When invoked, any Observer will be OnNexted this value
* @throws IllegalStateException
* if called after setException/setResponse/setComplete.
* @param response
*/
public void emitResponse(ResponseType response);
void emitResponse(ResponseType response);

/**
* When set, any Observer will be OnErrored this exception
Expand All @@ -528,7 +539,7 @@ public interface CollapsedRequest<ResponseType, RequestArgumentType> {
* @throws IllegalStateException
* if called more than once or after setResponse/setComplete.
*/
public void setException(Exception exception);
void setException(Exception exception);

/**
* When set, any Observer will have an OnCompleted emitted.
Expand All @@ -537,7 +548,7 @@ public interface CollapsedRequest<ResponseType, RequestArgumentType> {
* Note that, unlike the other 3 methods above, this method does not throw an IllegalStateException.
* This allows Hystrix-core to unilaterally call it without knowing the internal state.
*/
public void setComplete();
void setComplete();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.netflix.hystrix;

import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;

import java.util.concurrent.atomic.AtomicBoolean;

public class HystrixCommandResponseFromCache<R> extends HystrixCachedObservable<R> {
private final AbstractCommand<R> originalCommand;

/* package-private */ HystrixCommandResponseFromCache(Observable<R> originalObservable, final AbstractCommand<R> originalCommand) {
super(originalObservable);
this.originalCommand = originalCommand;
}

public Observable<R> toObservableWithStateCopiedInto(final AbstractCommand<R> commandToCopyStateInto) {
final AtomicBoolean completionLogicRun = new AtomicBoolean(false);

return cachedObservable
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if (!completionLogicRun.get()) {
commandCompleted(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
if (!completionLogicRun.get()) {
commandCompleted(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (!completionLogicRun.get()) {
commandUnsubscribed(commandToCopyStateInto);
completionLogicRun.set(true);
}
}
});
}

private void commandCompleted(final AbstractCommand<R> commandToCopyStateInto) {
commandToCopyStateInto.executionResult = originalCommand.executionResult;
}

private void commandUnsubscribed(final AbstractCommand<R> commandToCopyStateInto) {
commandToCopyStateInto.executionResult = commandToCopyStateInto.executionResult.addEvent(HystrixEventType.CANCELLED);
commandToCopyStateInto.executionResult = commandToCopyStateInto.executionResult.setExecutionLatency(-1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
Expand Down Expand Up @@ -392,12 +393,17 @@ protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentTy
* the {@code <BatchReturnType>} into {@code <ResponseType>}
*/
public Observable<ResponseType> observe() {
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
// use a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<ResponseType> subject = ReplaySubject.create();
// eagerly kick off subscription
toObservable().subscribe(subject);
final Subscription underlyingSubscription = toObservable().subscribe(subject);
// return the subject that can be subscribed to later while the execution has already started
return subject;
return subject.doOnUnsubscribe(new Action0() {
@Override
public void call() {
underlyingSubscription.unsubscribe();
}
});
}

/**
Expand Down Expand Up @@ -457,7 +463,7 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {
* If this is an issue we can make a lazy-future that gets set in the cache
* then only the winning 'put' will be invoked to actually call 'submitRequest'
*/
HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response, this);
HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response);
HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), toCache);
if (fromCache == null) {
return toCache.toObservable();
Expand Down
Loading

0 comments on commit bb8f153

Please sign in to comment.