From 657f698ab7a06bb69a65dd886b69c5d0e476c65c Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Mon, 11 Jul 2016 17:03:38 -0700 Subject: [PATCH] Prevent duplicate arguments from getting into a single collapser RequestBatch. If this is attempted, then 1 of 2 things can occur: If request-caching is on: the response for the 2nd-nth instance of the argument is the same as the first If off: the response for the 2nd-nth instance of the argument is an error --- .../com/netflix/hystrix/HystrixCollapser.java | 11 +- .../collapser/CollapsedRequestSubject.java | 11 +- .../hystrix/collapser/RequestBatch.java | 66 +++--- .../hystrix/collapser/RequestCollapser.java | 12 +- .../properties/HystrixPropertiesFactory.java | 1 + .../netflix/hystrix/HystrixCollapserTest.java | 217 +++++++++++++++++- .../HystrixObservableCollapserTest.java | 174 ++++++++++++++ 7 files changed, 442 insertions(+), 50 deletions(-) diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java index 744cb536c..c32063fe4 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java @@ -379,7 +379,6 @@ public Observable toObservable() { * {@link #mapResponseToRequests} to transform the {@code } into {@code } */ public Observable toObservable(Scheduler observeOn) { - return Observable.defer(new Func0>() { @Override public Observable call() { @@ -399,20 +398,12 @@ public Observable call() { Observable response = requestCollapser.submitRequest(getRequestArgument()); if (isRequestCacheEnabled && cacheKey != null) { - /* - * A race can occur here with multiple threads queuing but only one will be cached. - * This means we can have some duplication of requests in a thread-race but we're okay - * with having some inefficiency in duplicate requests in the same batch - * and then subsequent requests will retrieve a previously cached Observable. - * - * If this is an issue we can make a lazy-future that gets set in the cache - * then only the winning 'put' will be invoked to actually call 'submitRequest' - */ HystrixCachedObservable toCache = HystrixCachedObservable.from(response); HystrixCachedObservable fromCache = requestCache.putIfAbsent(cacheKey, toCache); if (fromCache == null) { return toCache.toObservable(); } else { + toCache.unsubscribe(); return fromCache.toObservable(); } } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/CollapsedRequestSubject.java b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/CollapsedRequestSubject.java index a9d33484f..b31f036d4 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/CollapsedRequestSubject.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/CollapsedRequestSubject.java @@ -49,19 +49,19 @@ private final ReplaySubject subject = ReplaySubject.create(); private final Observable subjectWithAccounting; - private volatile boolean subscribedTo = false; private volatile int outstandingSubscriptions = 0; public CollapsedRequestSubject(final R arg, final RequestBatch containingBatch) { + if (arg == RequestCollapser.NULL_SENTINEL) { + this.argument = null; + } else { + this.argument = arg; + } this.subjectWithAccounting = subject .doOnSubscribe(new Action0() { @Override public void call() { outstandingSubscriptions++; - if (!subscribedTo) { - subscribedTo = true; - //containingBatch.add(arg, this); - } } }) .doOnUnsubscribe(new Action0() { @@ -73,7 +73,6 @@ public void call() { } } }); - this.argument = arg; } public CollapsedRequestSubject(final R arg) { diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestBatch.java b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestBatch.java index 12d5748e9..98582922a 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestBatch.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestBatch.java @@ -16,9 +16,9 @@ package com.netflix.hystrix.collapser; import java.util.Collection; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.slf4j.Logger; @@ -46,13 +46,14 @@ public class RequestBatch { private final int maxBatchSize; private final AtomicBoolean batchStarted = new AtomicBoolean(); - private final ConcurrentLinkedQueue> batchArgumentQueue = - new ConcurrentLinkedQueue>(); - private final AtomicInteger count = new AtomicInteger(0); + private final ConcurrentMap> argumentMap = + new ConcurrentHashMap>(); + private final HystrixCollapserProperties properties; private ReentrantReadWriteLock batchLock = new ReentrantReadWriteLock(); public RequestBatch(HystrixCollapserProperties properties, HystrixCollapserBridge commandCollapser, int maxBatchSize) { + this.properties = properties; this.commandCollapser = commandCollapser; this.maxBatchSize = maxBatchSize; } @@ -76,14 +77,35 @@ public Observable offer(RequestArgumentType arg) { return null; } - if (count.get() >= maxBatchSize) { + if (argumentMap.size() >= maxBatchSize) { return null; } else { CollapsedRequestSubject collapsedRequest = new CollapsedRequestSubject(arg, this); - batchArgumentQueue.add(collapsedRequest); - count.incrementAndGet(); - return collapsedRequest.toObservable(); + final CollapsedRequestSubject existing = (CollapsedRequestSubject) argumentMap.putIfAbsent(arg, collapsedRequest); + /** + * If the argument already exists in the batch, then there are 2 options: + * A) If request caching is ON (the default): only keep 1 argument in the batch and let all responses + * be hooked up to that argument + * B) If request caching is OFF: return an error to all duplicate argument requests + * + * This maintains the invariant that each batch has no duplicate arguments. This prevents the impossible + * logic (in a user-provided mapResponseToRequests for HystrixCollapser and the internals of HystrixObservableCollapser) + * of trying to figure out which argument of a set of duplicates should get attached to a response. + * + * See https://github.com/Netflix/Hystrix/pull/1176 for further discussion. + */ + if (existing != null) { + boolean requestCachingEnabled = properties.requestCacheEnabled().get(); + if (requestCachingEnabled) { + return existing.toObservable(); + } else { + return Observable.error(new IllegalArgumentException("Duplicate argument in collapser batch : [" + arg + "] This is not supported. Please turn request-caching on for HystrixCollapser:" + commandCollapser.getCollapserKey().name() + " or prevent duplicates from making it into the batch!")); + } + } else { + return collapsedRequest.toObservable(); + } + } } finally { batchLock.readLock().unlock(); @@ -95,10 +117,8 @@ public Observable offer(RequestArgumentType arg) { /** * Best-effort attempt to remove an argument from a batch. This may get invoked when a cancellation occurs somewhere downstream. - * This method finds the first occurrence of an argument in the batch, and removes that occurrence. + * This method finds the argument in the batch, and removes it. * - * This is currently O(n). If an O(1) approach is needed, then we need to refactor internals to use a Map instead of Queue. - * My first pass at this is fairly naive, on the suspicion that unsubscription will be rare enough to not cause a perf problem. * @param arg argument to remove from batch */ /* package-private */ void remove(RequestArgumentType arg) { @@ -114,13 +134,7 @@ public Observable offer(RequestArgumentType arg) { return; } - for (CollapsedRequest collapsedRequest: batchArgumentQueue) { - if (arg.equals(collapsedRequest.getArgument())) { - batchArgumentQueue.remove(collapsedRequest); - count.decrementAndGet(); - return; //just remove a single instance - } - } + argumentMap.remove(arg); } finally { batchLock.readLock().unlock(); } @@ -150,7 +164,7 @@ public void executeBatchIfNotAlreadyStarted() { try { // shard batches - Collection>> shards = commandCollapser.shardRequests(batchArgumentQueue); + Collection>> shards = commandCollapser.shardRequests(argumentMap.values()); // for each shard execute its requests for (final Collection> shardRequests : shards) { try { @@ -173,7 +187,7 @@ public void call(Throwable e) { } logger.debug("Exception mapping responses to requests.", e); // if a failure occurs we want to pass that exception to all of the Futures that we've returned - for (CollapsedRequest request : batchArgumentQueue) { + for (CollapsedRequest request : argumentMap.values()) { try { ((CollapsedRequestSubject) request).setExceptionIfResponseNotReceived(ee); } catch (IllegalStateException e2) { @@ -221,7 +235,7 @@ public void call() { } catch (Exception e) { logger.error("Exception while sharding requests.", e); // same error handling as we do around the shards, but this is a wider net in case the shardRequest method fails - for (CollapsedRequest request : batchArgumentQueue) { + for (CollapsedRequest request : argumentMap.values()) { try { request.setException(e); } catch (IllegalStateException e2) { @@ -241,8 +255,8 @@ public void shutdown() { batchLock.writeLock().lock(); try { // if we win the 'start' and once we have the lock we can now shut it down otherwise another thread will finish executing this batch - if (count.get() > 0) { - logger.warn("Requests still exist in queue but will not be executed due to RequestCollapser shutdown: " + count.get(), new IllegalStateException()); + if (argumentMap.size() > 0) { + logger.warn("Requests still exist in queue but will not be executed due to RequestCollapser shutdown: " + argumentMap.size(), new IllegalStateException()); /* * In the event that there is a concurrency bug or thread scheduling prevents the timer from ticking we need to handle this so the Future.get() calls do not block. * @@ -250,7 +264,7 @@ public void shutdown() { * * This safety-net just prevents the CollapsedRequestFutureImpl.get() from waiting on the CountDownLatch until its max timeout. */ - for (CollapsedRequest request : batchArgumentQueue) { + for (CollapsedRequest request : argumentMap.values()) { try { ((CollapsedRequestSubject) request).setExceptionIfResponseNotReceived(new IllegalStateException("Requests not executed before shutdown.")); } catch (Exception e) { @@ -270,6 +284,6 @@ public void shutdown() { } public int getSize() { - return count.get(); + return argumentMap.size(); } } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapser.java index 9c0181573..783297d93 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapser.java @@ -41,6 +41,7 @@ */ public class RequestCollapser { static final Logger logger = LoggerFactory.getLogger(RequestCollapser.class); + static final Object NULL_SENTINEL = new Object(); private final HystrixCollapserBridge commandCollapser; // batch can be null once shutdown @@ -89,10 +90,15 @@ public Observable submitRequest(final RequestArgumentType arg) { return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown")); } - Observable f = b.offer(arg); + final Observable response; + if (arg != null) { + response = b.offer(arg); + } else { + response = b.offer( (RequestArgumentType) NULL_SENTINEL); + } // it will always get an Observable unless we hit the max batch size - if (f != null) { - return f; + if (response != null) { + return response; } else { // this batch can't accept requests so create a new one and set it if another thread doesn't beat us createNewBatchAndExecutePreviousIfNeeded(b); diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/properties/HystrixPropertiesFactory.java b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/properties/HystrixPropertiesFactory.java index 94edfec75..792d7ab09 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/strategy/properties/HystrixPropertiesFactory.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/strategy/properties/HystrixPropertiesFactory.java @@ -43,6 +43,7 @@ public class HystrixPropertiesFactory { public static void reset() { commandProperties.clear(); threadPoolProperties.clear(); + collapserProperties.clear(); } // String is CommandKey.name() (we can't use CommandKey directly as we can't guarantee it implements hashcode/equals correctly) diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java index fe3a54ee5..78fee72e9 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java @@ -26,13 +26,19 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler; import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; import com.netflix.hystrix.strategy.properties.HystrixPropertiesCollapserDefault; +import com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory; +import com.netflix.hystrix.util.HystrixTimer; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -50,6 +56,9 @@ import rx.Subscriber; import rx.Subscription; import rx.functions.Action0; +import rx.observers.Subscribers; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; import static org.junit.Assert.*; @@ -61,6 +70,7 @@ public class HystrixCollapserTest { public void init() { HystrixCollapserMetrics.reset(); HystrixCommandMetrics.reset(); + HystrixPropertiesFactory.reset(); } @Test @@ -170,6 +180,201 @@ public void testRequestsOverTime() throws Exception { assertEquals(1, cmdIterator.next().getNumberCollapsed()); } + class Pair { + final A a; + final B b; + + Pair(A a, B b) { + this.a = a; + this.b = b; + } + } + + class MyCommand extends HystrixCommand>> { + + private final List args; + + public MyCommand(List args) { + super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("BATCH"))); + this.args = args; + } + + @Override + protected List> run() throws Exception { + System.out.println("Executing batch command on : " + Thread.currentThread().getName() + " with args : " + args); + List> results = new ArrayList>(); + for (String arg: args) { + results.add(new Pair(arg, Integer.parseInt(arg))); + } + return results; + } + } + + class MyCollapser extends HystrixCollapser>, Integer, String> { + + private final String arg; + + MyCollapser(String arg, boolean reqCacheEnabled) { + super(HystrixCollapserKey.Factory.asKey("UNITTEST"), + Scope.REQUEST, + new RealCollapserTimer(), + HystrixCollapserProperties.Setter().withRequestCacheEnabled(reqCacheEnabled), + HystrixCollapserMetrics.getInstance(HystrixCollapserKey.Factory.asKey("UNITTEST"), + new HystrixPropertiesCollapserDefault(HystrixCollapserKey.Factory.asKey("UNITTEST"), + HystrixCollapserProperties.Setter()))); + this.arg = arg; + } + + @Override + public String getRequestArgument() { + return arg; + } + + @Override + protected HystrixCommand>> createCommand(Collection> collapsedRequests) { + List args = new ArrayList(collapsedRequests.size()); + for (CollapsedRequest req: collapsedRequests) { + args.add(req.getArgument()); + } + return new MyCommand(args); + } + + @Override + protected void mapResponseToRequests(List> batchResponse, Collection> collapsedRequests) { + for (Pair pair: batchResponse) { + for (CollapsedRequest collapsedReq: collapsedRequests) { + if (collapsedReq.getArgument().equals(pair.a)) { + collapsedReq.setResponse(pair.b); + } + } + } + } + + @Override + protected String getCacheKey() { + return arg; + } + } + + + @Test + public void testDuplicateArgumentsWithRequestCachingOn() throws Exception { + final int NUM = 10; + + List> observables = new ArrayList>(); + for (int i = 0; i < NUM; i++) { + MyCollapser c = new MyCollapser("5", true); + observables.add(c.toObservable()); + } + + List> subscribers = new ArrayList>(); + for (final Observable o: observables) { + final TestSubscriber sub = new TestSubscriber(); + subscribers.add(sub); + + o.subscribe(sub); + } + + Thread.sleep(100); + + //all subscribers should receive the same value + for (TestSubscriber sub: subscribers) { + sub.awaitTerminalEvent(1000, TimeUnit.MILLISECONDS); + System.out.println("Subscriber received : " + sub.getOnNextEvents()); + sub.assertNoErrors(); + sub.assertValues(5); + } + } + + @Test + public void testDuplicateArgumentsWithRequestCachingOff() throws Exception { + final int NUM = 10; + + List> observables = new ArrayList>(); + for (int i = 0; i < NUM; i++) { + MyCollapser c = new MyCollapser("5", false); + observables.add(c.toObservable()); + } + + List> subscribers = new ArrayList>(); + for (final Observable o: observables) { + final TestSubscriber sub = new TestSubscriber(); + subscribers.add(sub); + + o.subscribe(sub); + } + + Thread.sleep(100); + + AtomicInteger numErrors = new AtomicInteger(0); + AtomicInteger numValues = new AtomicInteger(0); + + // only the first subscriber should receive the value. + // the others should get an error that the batch contains duplicates + for (TestSubscriber sub: subscribers) { + sub.awaitTerminalEvent(1000, TimeUnit.MILLISECONDS); + if (sub.getOnCompletedEvents().isEmpty()) { + System.out.println(Thread.currentThread().getName() + " Error : " + sub.getOnErrorEvents()); + sub.assertError(IllegalArgumentException.class); + sub.assertNoValues(); + numErrors.getAndIncrement(); + + } else { + System.out.println(Thread.currentThread().getName() + " OnNext : " + sub.getOnNextEvents()); + sub.assertValues(5); + sub.assertCompleted(); + sub.assertNoErrors(); + numValues.getAndIncrement(); + } + } + + assertEquals(1, numValues.get()); + assertEquals(NUM - 1, numErrors.get()); + } + + @Test + public void testUnsubscribeFromSomeDuplicateArgsDoesNotRemoveFromBatch() throws Exception { + final int NUM = 10; + + List> observables = new ArrayList>(); + for (int i = 0; i < NUM; i++) { + MyCollapser c = new MyCollapser("5", true); + observables.add(c.toObservable()); + } + + List> subscribers = new ArrayList>(); + List subscriptions = new ArrayList(); + + for (final Observable o: observables) { + final TestSubscriber sub = new TestSubscriber(); + subscribers.add(sub); + + Subscription s = o.subscribe(sub); + subscriptions.add(s); + } + + + //unsubscribe from all but 1 + for (int i = 0; i < NUM - 1; i++) { + Subscription s = subscriptions.get(i); + s.unsubscribe(); + } + + Thread.sleep(100); + + //all subscribers with an active subscription should receive the same value + for (TestSubscriber sub: subscribers) { + if (!sub.isUnsubscribed()) { + sub.awaitTerminalEvent(1000, TimeUnit.MILLISECONDS); + System.out.println("Subscriber received : " + sub.getOnNextEvents()); + sub.assertNoErrors(); + sub.assertValues(5); + } else { + System.out.println("Subscriber is unsubscribed"); + } + } + } + @Test public void testUnsubscribeOnOneDoesntKillBatch() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); @@ -391,12 +596,14 @@ public void testRequestVariableLifecycle2() throws Exception { // kick off work (simulating a single request with multiple threads) for (int t = 0; t < 5; t++) { + final int outerLoop = t; Thread th = new Thread(new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() { @Override public void run() { for (int i = 0; i < 100; i++) { - responses.add(new TestRequestCollapser(timer, 1).queue()); + int uniqueInt = (outerLoop * 100) + i; + responses.add(new TestRequestCollapser(timer, uniqueInt).queue()); } } })); @@ -434,7 +641,7 @@ public void run() { // wait for all tasks to complete for (Future f : responses) { - assertEquals("1", f.get(1000, TimeUnit.MILLISECONDS)); + f.get(1000, TimeUnit.MILLISECONDS); } assertEquals("2", response2.get(1000, TimeUnit.MILLISECONDS)); assertEquals("3", response3.get(1000, TimeUnit.MILLISECONDS)); @@ -447,7 +654,7 @@ public void run() { } Iterator> cmdIterator = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().iterator(); - assertEquals(501, cmdIterator.next().getNumberCollapsed()); + assertEquals(500, cmdIterator.next().getNumberCollapsed()); assertEquals(2, cmdIterator.next().getNumberCollapsed()); assertEquals(1, cmdIterator.next().getNumberCollapsed()); @@ -661,8 +868,8 @@ public void testNoRequestCache3() { assertTrue(commandB.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); Iterator> cmdIterator = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().iterator(); - assertEquals(3, cmdIterator.next().getNumberCollapsed()); //1 for A, 2 for B. Batch contains all arguments (including duplicates) - assertEquals(3, cmdIterator.next().getNumberCollapsed()); //1 for A, 2 for B. Batch contains all arguments (including duplicates) + assertEquals(2, cmdIterator.next().getNumberCollapsed()); //1 for A, 1 for B. Batch contains only unique arguments (no duplicates) + assertEquals(2, cmdIterator.next().getNumberCollapsed()); //1 for A, 1 for B. Batch contains only unique arguments (no duplicates) } /** diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java index 2b72e6d33..083950359 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java @@ -36,6 +36,7 @@ import com.netflix.hystrix.collapser.CollapserTimer; import com.netflix.hystrix.collapser.RealCollapserTimer; import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable; +import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler; import com.netflix.hystrix.strategy.properties.HystrixPropertiesCollapserDefault; import org.junit.Before; import org.junit.Rule; @@ -1343,6 +1344,179 @@ public void onNext(String s) { assertEquals(0, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } + class Pair { + final A a; + final B b; + + Pair(A a, B b) { + this.a = a; + this.b = b; + } + } + + class MyCommand extends HystrixObservableCommand> { + + private final List args; + + public MyCommand(List args) { + super(HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("BATCH"))); + this.args = args; + } + + @Override + protected Observable> construct() { + return Observable.from(args).map(new Func1>() { + @Override + public Pair call(String s) { + return new Pair(s, Integer.parseInt(s)); + } + }); + } + } + + class MyCollapser extends HystrixObservableCollapser, Integer, String> { + + private final String arg; + + public MyCollapser(String arg, boolean requestCachingOn) { + super(HystrixCollapserKey.Factory.asKey("UNITTEST"), + HystrixObservableCollapser.Scope.REQUEST, + new RealCollapserTimer(), + HystrixCollapserProperties.Setter().withRequestCacheEnabled(requestCachingOn), + HystrixCollapserMetrics.getInstance(HystrixCollapserKey.Factory.asKey("UNITTEST"), + new HystrixPropertiesCollapserDefault(HystrixCollapserKey.Factory.asKey("UNITTEST"), + HystrixCollapserProperties.Setter()))); + this.arg = arg; + } + + + @Override + public String getRequestArgument() { + return arg; + } + + @Override + protected HystrixObservableCommand> createCommand(Collection> collapsedRequests) { + List args = new ArrayList(); + for (CollapsedRequest req: collapsedRequests) { + args.add(req.getArgument()); + } + + return new MyCommand(args); + } + + @Override + protected Func1, String> getBatchReturnTypeKeySelector() { + return new Func1, String>() { + @Override + public String call(Pair pair) { + return pair.a; + } + }; + } + + @Override + protected Func1 getRequestArgumentKeySelector() { + return new Func1() { + @Override + public String call(String s) { + return s; + } + }; + } + + @Override + protected void onMissingResponse(CollapsedRequest r) { + r.setException(new RuntimeException("missing")); + } + + @Override + protected Func1, Integer> getBatchReturnTypeToResponseTypeMapper() { + return new Func1, Integer>() { + @Override + public Integer call(Pair pair) { + return pair.b; + } + }; + } + } + + @Test + public void testDuplicateArgumentsWithRequestCachingOn() throws Exception { + final int NUM = 10; + + List> observables = new ArrayList>(); + for (int i = 0; i < NUM; i++) { + MyCollapser c = new MyCollapser("5", true); + observables.add(c.toObservable()); + } + + List> subscribers = new ArrayList>(); + for (final Observable o: observables) { + final TestSubscriber sub = new TestSubscriber(); + subscribers.add(sub); + + o.subscribe(sub); + } + + Thread.sleep(100); + + //all subscribers should receive the same value + for (TestSubscriber sub: subscribers) { + sub.awaitTerminalEvent(1000, TimeUnit.MILLISECONDS); + System.out.println("Subscriber received : " + sub.getOnNextEvents()); + sub.assertCompleted(); + sub.assertNoErrors(); + sub.assertValues(5); + } + } + + @Test + public void testDuplicateArgumentsWithRequestCachingOff() throws Exception { + final int NUM = 10; + + List> observables = new ArrayList>(); + for (int i = 0; i < NUM; i++) { + MyCollapser c = new MyCollapser("5", false); + observables.add(c.toObservable()); + } + + List> subscribers = new ArrayList>(); + for (final Observable o: observables) { + final TestSubscriber sub = new TestSubscriber(); + subscribers.add(sub); + + o.subscribe(sub); + } + + Thread.sleep(100); + + AtomicInteger numErrors = new AtomicInteger(0); + AtomicInteger numValues = new AtomicInteger(0); + + // only the first subscriber should receive the value. + // the others should get an error that the batch contains duplicates + for (TestSubscriber sub: subscribers) { + sub.awaitTerminalEvent(1000, TimeUnit.MILLISECONDS); + if (sub.getOnCompletedEvents().isEmpty()) { + System.out.println(Thread.currentThread().getName() + " Error : " + sub.getOnErrorEvents()); + sub.assertError(IllegalArgumentException.class); + sub.assertNoValues(); + numErrors.getAndIncrement(); + + } else { + System.out.println(Thread.currentThread().getName() + " OnNext : " + sub.getOnNextEvents()); + sub.assertValues(5); + sub.assertCompleted(); + sub.assertNoErrors(); + numValues.getAndIncrement(); + } + } + + assertEquals(1, numValues.get()); + assertEquals(NUM - 1, numErrors.get()); + } + protected void assertCommandExecutionEvents(HystrixInvokableInfo command, HystrixEventType... expectedEventTypes) { boolean emitExpected = false; int expectedEmitCount = 0;