From f787e1268c853362843a9217c09b38d1430855a1 Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Mon, 11 Jul 2016 11:05:31 -0700 Subject: [PATCH] Moved all work performed in HystrixObservableCommand.toObservable inside an Observable.defer to make it lazy --- .../hystrix/HystrixObservableCollapser.java | 68 ++++++++++--------- 1 file changed, 37 insertions(+), 31 deletions(-) diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java index 9f32fc734..a50e7cca7 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java @@ -34,6 +34,7 @@ import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; +import rx.functions.Func0; import rx.functions.Func1; import rx.schedulers.Schedulers; import rx.subjects.ReplaySubject; @@ -439,39 +440,44 @@ public Observable toObservable() { */ public Observable toObservable(Scheduler observeOn) { - final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get(); - - /* try from cache first */ - if (isRequestCacheEnabled) { - HystrixCachedObservable fromCache = requestCache.get(getCacheKey()); - if (fromCache != null) { - metrics.markResponseFromCache(); - return fromCache.toObservable(); - } - } + return Observable.defer(new Func0>() { + @Override + public Observable call() { + final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get(); + + /* try from cache first */ + if (isRequestCacheEnabled) { + HystrixCachedObservable fromCache = requestCache.get(getCacheKey()); + if (fromCache != null) { + metrics.markResponseFromCache(); + return fromCache.toObservable(); + } + } - RequestCollapser requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper); - Observable response = requestCollapser.submitRequest(getRequestArgument()); - metrics.markRequestBatched(); - if (isRequestCacheEnabled) { - /* - * A race can occur here with multiple threads queuing but only one will be cached. - * This means we can have some duplication of requests in a thread-race but we're okay - * with having some inefficiency in duplicate requests in the same batch - * and then subsequent requests will retrieve a previously cached Observable. - * - * If this is an issue we can make a lazy-future that gets set in the cache - * then only the winning 'put' will be invoked to actually call 'submitRequest' - */ - HystrixCachedObservable toCache = HystrixCachedObservable.from(response); - HystrixCachedObservable fromCache = requestCache.putIfAbsent(getCacheKey(), toCache); - if (fromCache == null) { - return toCache.toObservable(); - } else { - return fromCache.toObservable(); + RequestCollapser requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper); + Observable response = requestCollapser.submitRequest(getRequestArgument()); + metrics.markRequestBatched(); + if (isRequestCacheEnabled) { + /* + * A race can occur here with multiple threads queuing but only one will be cached. + * This means we can have some duplication of requests in a thread-race but we're okay + * with having some inefficiency in duplicate requests in the same batch + * and then subsequent requests will retrieve a previously cached Observable. + * + * If this is an issue we can make a lazy-future that gets set in the cache + * then only the winning 'put' will be invoked to actually call 'submitRequest' + */ + HystrixCachedObservable toCache = HystrixCachedObservable.from(response); + HystrixCachedObservable fromCache = requestCache.putIfAbsent(getCacheKey(), toCache); + if (fromCache == null) { + return toCache.toObservable(); + } else { + return fromCache.toObservable(); + } + } + return response; } - } - return response; + }); } /**