From fb96410f8df4d59d59ad5006c687c00ba0459517 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 22 Apr 2014 16:44:15 -0700 Subject: [PATCH] HystrixObservableCollapser Make the collapser support non-blocking HystrixObservableCommand --- .../com/netflix/hystrix/HystrixCollapser.java | 4 +- .../hystrix/HystrixObservableCollapser.java | 534 ++++++++++++++++++ .../collapser/RequestCollapserFactory.java | 15 +- .../netflix/hystrix/HystrixCollapserTest.java | 2 +- .../HystrixObservableCollapserTest.java | 204 +++++++ .../hystrix/HystrixObservableCommandTest.java | 2 +- 6 files changed, 754 insertions(+), 7 deletions(-) create mode 100644 hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java create mode 100644 hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java index 6f71d0556..3f1b5b398 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java @@ -78,7 +78,7 @@ public abstract class HystrixCollapserGLOBAL: Requests from any thread (ie. all HTTP requests) within the JVM will be collapsed. 1 queue for entire app. * */ - public static enum Scope { + public static enum Scope implements RequestCollapserFactory.Scope { REQUEST, GLOBAL } @@ -185,7 +185,7 @@ public HystrixCollapserKey getCollapserKey() { * @return {@link Scope} that collapsing should be performed within. */ public Scope getScope() { - return collapserFactory.getScope(); + return Scope.valueOf(collapserFactory.getScope().name()); } /** diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java new file mode 100644 index 000000000..8011422b3 --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java @@ -0,0 +1,534 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; + +import javax.annotation.concurrent.NotThreadSafe; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import rx.Observable; +import rx.Scheduler; +import rx.schedulers.Schedulers; +import rx.subjects.ReplaySubject; + +import com.netflix.hystrix.HystrixCollapser.CollapsedRequest; +import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy; +import com.netflix.hystrix.collapser.CollapserTimer; +import com.netflix.hystrix.collapser.HystrixCollapserBridge; +import com.netflix.hystrix.collapser.RealCollapserTimer; +import com.netflix.hystrix.collapser.RequestCollapser; +import com.netflix.hystrix.collapser.RequestCollapserFactory; +import com.netflix.hystrix.exception.HystrixRuntimeException; +import com.netflix.hystrix.strategy.HystrixPlugins; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; +import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy; + +/** + * Collapse multiple requests into a single {@link HystrixCommand} execution based on a time window and optionally a max batch size. + *

+ * This allows an object model to have multiple calls to the command that execute/queue many times in a short period (milliseconds) and have them all get batched into a single backend call. + *

+ * Typically the time window is something like 10ms give or take. + *

+ * NOTE: Do NOT retain any state within instances of this class. + *

+ * It must be stateless or else it will be non-deterministic because most instances are discarded while some are retained and become the + * "collapsers" for all the ones that are discarded. + * + * @param + * The type returned from the {@link HystrixCommand} that will be invoked on batch executions. + * @param + * The type returned from this command. + * @param + * The type of the request argument. If multiple arguments are needed, wrap them in another object or a Tuple. + */ +public abstract class HystrixObservableCollapser implements HystrixExecutable { + + static final Logger logger = LoggerFactory.getLogger(HystrixObservableCollapser.class); + + private final RequestCollapserFactory collapserFactory; + private final HystrixRequestCache requestCache; + private final HystrixCollapserBridge collapserInstanceWrapper; + + /** + * The scope of request collapsing. + *

+ */ + public static enum Scope implements RequestCollapserFactory.Scope { + REQUEST, GLOBAL + } + + /** + * Collapser with default {@link HystrixCollapserKey} derived from the implementing class name and scoped to {@link Scope#REQUEST} and default configuration. + */ + protected HystrixObservableCollapser() { + this(Setter.withCollapserKey(null).andScope(Scope.REQUEST)); + } + + /** + * Collapser scoped to {@link Scope#REQUEST} and default configuration. + * + * @param collapserKey + * {@link HystrixCollapserKey} that identifies this collapser and provides the key used for retrieving properties, request caches, publishing metrics etc. + */ + protected HystrixObservableCollapser(HystrixCollapserKey collapserKey) { + this(Setter.withCollapserKey(collapserKey).andScope(Scope.REQUEST)); + } + + /** + * Construct a {@link HystrixObservableCollapser} with defined {@link Setter} that allows + * injecting property and strategy overrides and other optional arguments. + *

+ * Null values will result in the default being used. + * + * @param setter + * Fluent interface for constructor arguments + */ + protected HystrixObservableCollapser(Setter setter) { + this(setter.collapserKey, setter.scope, new RealCollapserTimer(), setter.propertiesSetter); + } + + /* package for tests */HystrixObservableCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder) { + if (collapserKey == null || collapserKey.name().trim().equals("")) { + String defaultKeyName = getDefaultNameFromClass(getClass()); + collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName); + } + + this.collapserFactory = new RequestCollapserFactory(collapserKey, scope, timer, propertiesBuilder); + this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy()); + + final HystrixObservableCollapser self = this; + + /** + * Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class. + */ + collapserInstanceWrapper = new HystrixCollapserBridge() { + + @Override + public Collection>> shardRequests(Collection> requests) { + return self.shardRequests(requests); + } + + @Override + public Observable createObservableCommand(Collection> requests) { + HystrixObservableCommand command = self.createCommand(requests); + + // mark the number of requests being collapsed together + command.markAsCollapsedCommand(requests.size()); + + return command.toObservable(); + } + + @Override + public void mapResponseToRequests(BatchReturnType batchResponse, Collection> requests) { + self.mapResponseToRequests(batchResponse, requests); + } + + @Override + public HystrixCollapserKey getCollapserKey() { + return self.getCollapserKey(); + } + + }; + } + + private HystrixCollapserProperties getProperties() { + return collapserFactory.getProperties(); + } + + /** + * Key of the {@link HystrixObservableCollapser} used for properties, metrics, caches, reporting etc. + * + * @return {@link HystrixCollapserKey} identifying this {@link HystrixObservableCollapser} instance + */ + public HystrixCollapserKey getCollapserKey() { + return collapserFactory.getCollapserKey(); + } + + /** + * Scope of collapsing. + *

+ *

+ *

+ * Default: {@link Scope#REQUEST} (defined via constructor) + * + * @return {@link Scope} that collapsing should be performed within. + */ + public Scope getScope() { + return Scope.valueOf(collapserFactory.getScope().name()); + } + + /** + * The request arguments to be passed to the {@link HystrixCommand}. + *

+ * Typically this means to take the argument(s) provided to the constructor and return it here. + *

+ * If there are multiple arguments that need to be bundled, create a single object to contain them, or use a Tuple. + * + * @return RequestArgumentType + */ + public abstract RequestArgumentType getRequestArgument(); + + /** + * Factory method to create a new {@link HystrixObservableCommand}{@code } command object each time a batch needs to be executed. + *

+ * Do not return the same instance each time. Return a new instance on each invocation. + *

+ * Process the 'requests' argument into the arguments the command object needs to perform its work. + *

+ * If a batch or requests needs to be split (sharded) into multiple commands, see {@link #shardRequests}

+ * IMPLEMENTATION NOTE: Be fast (ie. <1ms) in this method otherwise it can block the Timer from executing subsequent batches. Do not do any processing beyond constructing the command and returning + * it. + * + * @param requests + * {@code Collection>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch. + * @return {@link HystrixObservableCommand}{@code } which when executed will retrieve results for the batch of arguments as found in the Collection of {@link CollapsedRequest} objects + */ + protected abstract HystrixObservableCommand createCommand(Collection> requests); + + /** + * Override to split (shard) a batch of requests into multiple batches that will each call createCommand separately. + *

+ * The purpose of this is to allow collapsing to work for services that have sharded backends and batch executions that need to be shard-aware. + *

+ * For example, a batch of 100 requests could be split into 4 different batches sharded on name (ie. a-g, h-n, o-t, u-z) that each result in a separate {@link HystrixCommand} being created and + * executed for them. + *

+ * By default this method does nothing to the Collection and is a pass-thru. + * + * @param requests + * {@code Collection>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch. + * @return Collection of {@code Collection>} objects sharded according to business rules. + *

The CollapsedRequest instances should not be modified or wrapped as the CollapsedRequest instance object contains state information needed to complete the execution. + */ + protected Collection>> shardRequests(Collection> requests) { + return Collections.singletonList(requests); + } + + /** + * Executed after the {@link HystrixCommand}{@code } command created by {@link #createCommand} finishes processing (unless it fails) for mapping the {@code } to + * the list of {@code CollapsedRequest} objects. + *

+ * IMPORTANT IMPLEMENTATION DETAIL => The expected contract (responsibilities) of this method implementation is: + *

+ *

+ *

+ * Common code when {@code } is {@code List} is: + *

+ * + *

+     * int count = 0;
+     * for ({@code CollapsedRequest} request : requests) {
+     *      request.setResponse(batchResponse.get(count++));
+     * }
+     * 
+ * + * For example if the types were {@code , String, String>}: + *

+ * + *

+     * int count = 0;
+     * for ({@code CollapsedRequest} request : requests) {
+     *      request.setResponse(batchResponse.get(count++));
+     * }
+     * 
+ * + * @param batchResponse + * The {@code } returned from the {@link HystrixCommand}{@code } command created by {@link #createCommand}. + *

+ * + * @param requests + * {@code Collection>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch. + *

+ * The {@link CollapsedRequest#setResponse(Object)} or {@link CollapsedRequest#setException(Exception)} must be called on each {@link CollapsedRequest} in the Collection. + */ + protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection> requests); + + /** + * Used for asynchronous execution with a callback by subscribing to the {@link Observable}. + *

+ * This eagerly starts execution the same as {@link #queue()} and {@link #execute()}. + * A lazy {@link Observable} can be obtained from {@link #toObservable()}. + *

+ * Callback Scheduling + *

+ *

    + *
  • When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.
  • + *
  • When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.
  • + *
+ * Use {@link #toObservable(rx.Scheduler)} to schedule the callback differently. + *

+ * See https://github.com/Netflix/RxJava/wiki for more information. + * + * @return {@code Observable} that executes and calls back with the result of of {@link HystrixCommand}{@code } execution after passing through {@link #mapResponseToRequests} + * to transform the {@code } into {@code } + */ + public Observable observe() { + // us a ReplaySubject to buffer the eagerly subscribed-to Observable + ReplaySubject subject = ReplaySubject.create(); + // eagerly kick off subscription + toObservable().subscribe(subject); + // return the subject that can be subscribed to later while the execution has already started + return subject; + } + + /** + * A lazy {@link Observable} that will execute when subscribed to. + *

+ * Callback Scheduling + *

+ *

    + *
  • When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.
  • + *
  • When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.
  • + *
+ *

+ * See https://github.com/Netflix/RxJava/wiki for more information. + * + * @return {@code Observable} that lazily executes and calls back with the result of of {@link HystrixCommand}{@code } execution after passing through + * {@link #mapResponseToRequests} to transform the {@code } into {@code } + */ + public Observable toObservable() { + // when we callback with the data we want to do the work + // on a separate thread than the one giving us the callback + return toObservable(Schedulers.computation()); + } + + /** + * A lazy {@link Observable} that will execute when subscribed to. + *

+ * See https://github.com/Netflix/RxJava/wiki for more information. + * + * @param observeOn + * The {@link Scheduler} to execute callbacks on. + * @return {@code Observable} that lazily executes and calls back with the result of of {@link HystrixCommand}{@code } execution after passing through + * {@link #mapResponseToRequests} to transform the {@code } into {@code } + */ + public Observable toObservable(Scheduler observeOn) { + + /* try from cache first */ + if (getProperties().requestCachingEnabled().get()) { + Observable fromCache = requestCache.get(getCacheKey()); + if (fromCache != null) { + /* mark that we received this response from cache */ + // TODO Add collapser metrics so we can capture this information + // we can't add it to the command metrics because the command can change each time (dynamic key for example) + // and we don't have access to it when responding from cache + // collapserMetrics.markResponseFromCache(); + return fromCache; + } + } + + RequestCollapser requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper); + Observable response = requestCollapser.submitRequest(getRequestArgument()); + if (getProperties().requestCachingEnabled().get()) { + /* + * A race can occur here with multiple threads queuing but only one will be cached. + * This means we can have some duplication of requests in a thread-race but we're okay + * with having some inefficiency in duplicate requests in the same batch + * and then subsequent requests will retrieve a previously cached Observable. + * + * If this is an issue we can make a lazy-future that gets set in the cache + * then only the winning 'put' will be invoked to actually call 'submitRequest' + */ + Observable o = response.cache(); + Observable fromCache = requestCache.putIfAbsent(getCacheKey(), o); + if (fromCache == null) { + response = o; + } else { + response = fromCache; + } + } + return response; + } + + /** + * Used for synchronous execution. + *

+ * If {@link Scope#REQUEST} is being used then synchronous execution will only result in collapsing if other threads are running within the same scope. + * + * @return ResponseType + * Result of {@link HystrixCommand}{@code } execution after passing through {@link #mapResponseToRequests} to transform the {@code } into + * {@code } + * @throws HystrixRuntimeException + * if an error occurs and a fallback cannot be retrieved + */ + public ResponseType execute() { + try { + return queue().get(); + } catch (Throwable e) { + if (e instanceof HystrixRuntimeException) { + throw (HystrixRuntimeException) e; + } + // if we have an exception we know about we'll throw it directly without the threading wrapper exception + if (e.getCause() instanceof HystrixRuntimeException) { + throw (HystrixRuntimeException) e.getCause(); + } + // we don't know what kind of exception this is so create a generic message and throw a new HystrixRuntimeException + String message = getClass().getSimpleName() + " HystrixCollapser failed while executing."; + logger.debug(message, e); // debug only since we're throwing the exception and someone higher will do something with it + //TODO should this be made a HystrixRuntimeException? + throw new RuntimeException(message, e); + } + } + + /** + * Used for asynchronous execution. + *

+ * This will queue up the command and return a Future to get the result once it completes. + * + * @return ResponseType + * Result of {@link HystrixCommand}{@code } execution after passing through {@link #mapResponseToRequests} to transform the {@code } into + * {@code } + * @throws HystrixRuntimeException + * within an ExecutionException.getCause() (thrown by {@link Future#get}) if an error occurs and a fallback cannot be retrieved + */ + public Future queue() { + final Observable o = toObservable(); + return o.toBlockingObservable().toFuture(); + } + + /** + * Key to be used for request caching. + *

+ * By default this returns null which means "do not cache". + *

+ * To enable caching override this method and return a string key uniquely representing the state of a command instance. + *

+ * If multiple command instances in the same request scope match keys then only the first will be executed and all others returned from cache. + * + * @return String cacheKey or null if not to cache + */ + protected String getCacheKey() { + return null; + } + + /** + * Clears all state. If new requests come in instances will be recreated and metrics started from scratch. + */ + /* package */static void reset() { + RequestCollapserFactory.reset(); + } + + private static String getDefaultNameFromClass(@SuppressWarnings("rawtypes") Class cls) { + String fromCache = defaultNameCache.get(cls); + if (fromCache != null) { + return fromCache; + } + // generate the default + // default HystrixCommandKey to use if the method is not overridden + String name = cls.getSimpleName(); + if (name.equals("")) { + // we don't have a SimpleName (anonymous inner class) so use the full class name + name = cls.getName(); + name = name.substring(name.lastIndexOf('.') + 1, name.length()); + } + defaultNameCache.put(cls, name); + return name; + } + + /** + * Fluent interface for arguments to the {@link HystrixObservableCollapser} constructor. + *

+ * The required arguments are set via the 'with' factory method and optional arguments via the 'and' chained methods. + *

+ * Example: + *

 {@code
+     *  Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("CollapserName"))
+                .andScope(Scope.REQUEST);
+     * } 
+ */ + @NotThreadSafe + public static class Setter { + private final HystrixCollapserKey collapserKey; + private Scope scope = Scope.REQUEST; // default if nothing is set + private HystrixCollapserProperties.Setter propertiesSetter; + + private Setter(HystrixCollapserKey collapserKey) { + this.collapserKey = collapserKey; + } + + /** + * Setter factory method containing required values. + *

+ * All optional arguments can be set via the chained methods. + * + * @param collapserKey + * {@link HystrixCollapserKey} that identifies this collapser and provides the key used for retrieving properties, request caches, publishing metrics etc. + * @return Setter for fluent interface via method chaining + */ + public static Setter withCollapserKey(HystrixCollapserKey collapserKey) { + return new Setter(collapserKey); + } + + /** + * {@link Scope} defining what scope the collapsing should occur within + * + * @param scope + * + * @return Setter for fluent interface via method chaining + */ + public Setter andScope(Scope scope) { + this.scope = scope; + return this; + } + + /** + * @param propertiesSetter + * {@link HystrixCollapserProperties.Setter} that allows instance specific property overrides (which can then be overridden by dynamic properties, see + * {@link HystrixPropertiesStrategy} for + * information on order of precedence). + *

+ * Will use defaults if left NULL. + * @return Setter for fluent interface via method chaining + */ + public Setter andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter propertiesSetter) { + this.propertiesSetter = propertiesSetter; + return this; + } + + } + + // this is a micro-optimization but saves about 1-2microseconds (on 2011 MacBook Pro) + // on the repetitive string processing that will occur on the same classes over and over again + @SuppressWarnings("rawtypes") + private static ConcurrentHashMap, String> defaultNameCache = new ConcurrentHashMap, String>(); + +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapserFactory.java b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapserFactory.java index 0129fc780..614006dc4 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapserFactory.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapserFactory.java @@ -36,6 +36,15 @@ public class RequestCollapserFactory getRequestCollapser(HystrixCollapserBridge commandCollapser) { - if (Scope.REQUEST == getScope()) { + if (Scopes.REQUEST == Scopes.valueOf(getScope().name())) { return getCollapserForUserRequest(commandCollapser); - } else if (Scope.GLOBAL == getScope()) { + } else if (Scopes.GLOBAL == Scopes.valueOf(getScope().name())) { return getCollapserForGlobalScope(commandCollapser); } else { logger.warn("Invalid Scope: " + getScope() + " Defaulting to REQUEST scope."); @@ -210,7 +219,7 @@ public void shutdown(RequestCollapser tasks = new ConcurrentLinkedQueue(); diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java new file mode 100644 index 000000000..cfbd3d24a --- /dev/null +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java @@ -0,0 +1,204 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.schedulers.Schedulers; + +import com.netflix.hystrix.HystrixCollapser.CollapsedRequest; +import com.netflix.hystrix.HystrixCollapserTest.TestCollapserTimer; +import com.netflix.hystrix.HystrixObservableCommandTest.TestHystrixCommand; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; + +public class HystrixObservableCollapserTest { + static AtomicInteger counter = new AtomicInteger(); + + @Before + public void init() { + counter.set(0); + // since we're going to modify properties of the same class between tests, wipe the cache each time + HystrixCollapser.reset(); + /* we must call this to simulate a new request lifecycle running and clearing caches */ + HystrixRequestContext.initializeContext(); + } + + @After + public void cleanup() { + // instead of storing the reference from initialize we'll just get the current state and shutdown + if (HystrixRequestContext.getContextForCurrentThread() != null) { + // it may be null if a test shuts the context down manually + HystrixRequestContext.getContextForCurrentThread().shutdown(); + } + } + + @Test + public void testTwoRequests() throws Exception { + TestCollapserTimer timer = new TestCollapserTimer(); + Future response1 = new TestRequestCollapser(timer, counter, 1).queue(); + Future response2 = new TestRequestCollapser(timer, counter, 2).queue(); + timer.incrementTime(10); // let time pass that equals the default delay/period + + assertEquals("1", response1.get()); + assertEquals("2", response2.get()); + + assertEquals(1, counter.get()); + + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + } + + private static class TestRequestCollapser extends HystrixObservableCollapser, String, String> { + + private final AtomicInteger count; + private final String value; + private ConcurrentLinkedQueue>> commandsExecuted; + + public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, int value) { + this(timer, counter, String.valueOf(value)); + } + + public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value) { + this(timer, counter, value, 10000, 10); + } + + public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, ConcurrentLinkedQueue>> executionLog) { + this(timer, counter, value, 10000, 10, executionLog); + } + + public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, int value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { + this(timer, counter, String.valueOf(value), defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds); + } + + public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { + this(timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null); + } + + public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { + this(scope, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null); + } + + public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue>> executionLog) { + this(Scope.REQUEST, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, executionLog); + } + + public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue>> executionLog) { + // use a CollapserKey based on the CollapserTimer object reference so it's unique for each timer as we don't want caching + // of properties to occur and we're using the default HystrixProperty which typically does caching + super(collapserKeyFromString(timer), scope, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(defaultMaxRequestsInBatch).withTimerDelayInMilliseconds(defaultTimerDelayInMilliseconds)); + this.count = counter; + this.value = value; + this.commandsExecuted = executionLog; + } + + @Override + public String getRequestArgument() { + return value; + } + + @Override + public HystrixObservableCommand> createCommand(final Collection> requests) { + /* return a mocked command */ + HystrixObservableCommand> command = new TestCollapserCommand(requests); + if (commandsExecuted != null) { + commandsExecuted.add(command); + } + return command; + } + + @Override + public void mapResponseToRequests(List batchResponse, Collection> requests) { + // count how many times a batch is executed (this method is executed once per batch) + System.out.println("increment count: " + count.incrementAndGet()); + + // for simplicity I'll assume it's a 1:1 mapping between lists ... in real implementations they often need to index to maps + // to allow random access as the response size does not match the request size + if (batchResponse.size() != requests.size()) { + throw new RuntimeException("lists don't match in size => " + batchResponse.size() + " : " + requests.size()); + } + int i = 0; + for (CollapsedRequest request : requests) { + request.setResponse(batchResponse.get(i++)); + } + + } + + } + + private static HystrixCollapserKey collapserKeyFromString(final Object o) { + return new HystrixCollapserKey() { + + @Override + public String name() { + return String.valueOf(o); + } + + }; + } + + private static class TestCollapserCommand extends TestHystrixCommand> { + + private final Collection> requests; + + TestCollapserCommand(Collection> requests) { + super(testPropsBuilder().setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter().withExecutionIsolationThreadTimeoutInMilliseconds(50))); + this.requests = requests; + } + + @Override + protected Observable> run() { + return Observable.create(new OnSubscribe>() { + + @Override + public void call(Subscriber> s) { + System.out.println(">>> TestCollapserCommand run() ... batch size: " + requests.size()); + // simulate a batch request + ArrayList response = new ArrayList(); + for (CollapsedRequest request : requests) { + if (request.getArgument() == null) { + throw new NullPointerException("Simulated Error"); + } + if (request.getArgument() == "TIMEOUT") { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + response.add(request.getArgument()); + } + s.onNext(response); + s.onCompleted(); + } + + }).subscribeOn(Schedulers.computation()); + } + + } +} diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java index b7579e7c3..d136350b2 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java @@ -6119,7 +6119,7 @@ public void testTimeoutWithFallbackRequestContextWithThreadIsolatedAsynchronousO /** * Used by UnitTest command implementations to provide base defaults for constructor and a builder pattern for the arguments being passed in. */ - private static abstract class TestHystrixCommand extends HystrixObservableCommand { + /* package */ static abstract class TestHystrixCommand extends HystrixObservableCommand { final TestCommandBuilder builder;