Skip to content

Commit

Permalink
Make sure that metrics streams are multicast such that the work hapen…
Browse files Browse the repository at this point in the history
…s once and all subscribers get that copy
  • Loading branch information
Matt Jacobs committed Aug 3, 2016
1 parent 079fd7c commit 28f3042
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
import com.netflix.hystrix.metric.HystrixEvent;
import com.netflix.hystrix.metric.HystrixEventStream;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func2;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* Refinement of {@link BucketedCounterStream} which accumulates counters infinitely in the bucket-reduction step
*
Expand All @@ -28,19 +31,35 @@
* @param <Output> type of data emitted to stream subscribers (often is the same as A but does not have to be)
*/
public abstract class BucketedCumulativeCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
private Func2<Output, Bucket, Output> reduceBucket;
private Observable<Output> sourceStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
Func2<Bucket, Event, Bucket> reduceCommandCompletion,
Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
this.reduceBucket = reduceBucket;

this.sourceStream = bucketedStream
.scan(getEmptyOutputValue(), reduceBucket)
.skip(numBuckets)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}

@Override
public Observable<Output> observe() {
return bucketedStream
.scan(getEmptyOutputValue(), reduceBucket)
.skip(numBuckets);
return sourceStream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
import com.netflix.hystrix.metric.HystrixEvent;
import com.netflix.hystrix.metric.HystrixEventStream;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* Refinement of {@link BucketedCounterStream} which reduces numBuckets at a time.
*
Expand All @@ -29,24 +32,44 @@
* @param <Output> type of data emitted to stream subscribers (often is the same as A but does not have to be)
*/
public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
private final Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary;
private Observable<Output> sourceStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
final Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
this.reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
@Override
public Observable<Output> call(Observable<Bucket> window) {
return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
}
};
this.sourceStream = bucketedStream //stream broken up into buckets
.window(numBuckets, 1) //emit overlapping windows of buckets
.flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}

@Override
public Observable<Output> observe() {
return bucketedStream //stream broken up into buckets
.window(numBuckets, 1) //emit overlapping windows of buckets
.flatMap(reduceWindowToSummary); //convert a window of bucket-summaries into a single summary
return sourceStream;
}

/* package-private */ boolean isSourceCurrentlySubscribed() {
return isSourceCurrentlySubscribed.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,22 @@ public Integer call(HystrixCommandExecutionStarted event) {
}
};



protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
for (int i = 0; i < numBuckets; i++) {
emptyRollingMaxBuckets.add(0);
}

rollingMaxStream = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
Observable<Integer> eventConcurrencyStream = inputEventStream
.observe()
.map(getConcurrencyCountFromEvent);

return eventConcurrencyStream
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.flatMap(reduceStreamToMax)
.startWith(emptyRollingMaxBuckets)
.window(numBuckets, 1)
.flatMap(reduceStreamToMax);
}
}).share();
rollingMaxStream = inputEventStream
.observe()
.map(getConcurrencyCountFromEvent)
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.flatMap(reduceStreamToMax)
.startWith(emptyRollingMaxBuckets)
.window(numBuckets, 1)
.flatMap(reduceStreamToMax)
.share()
.onBackpressureDrop();
}

public void startCachingStreamValuesIfUnstarted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,16 @@ public Observable<Histogram> call(Observable<Event> bucket) {
}
};

rollingDistributionStream = Observable.defer(new Func0<Observable<CachedValuesHistogram>>() {
@Override
public Observable<CachedValuesHistogram> call() {
return stream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
.flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
.startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
.window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
.flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
.map(cacheHistogramValues); //convert to CachedValueHistogram (commonly-accessed values are cached)
}
}).share(); //multicast
rollingDistributionStream = stream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
.flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
.startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
.window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
.flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
.map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
.share()
.onBackpressureDrop();
}

public Observable<CachedValuesHistogram> observe() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.hystrix.metric.consumer;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandMetrics;
Expand All @@ -27,12 +28,19 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -462,4 +470,153 @@ public void testMultipleEventsOverTimeGetStoredAndAgeOut() {
assertEquals(0L, stream.getLatest().getErrorCount());
assertEquals(0L, stream.getLatest().getTotalRequests());
}

@Test
public void testSharedSourceStream() throws InterruptedException {
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("CMD-Health-N");
stream = HealthCountsStream.getInstance(key, 10, 100);

final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean allEqual = new AtomicBoolean(false);

Observable<HystrixCommandMetrics.HealthCounts> o1 = stream
.observe()
.take(10)
.observeOn(Schedulers.computation());

Observable<HystrixCommandMetrics.HealthCounts> o2 = stream
.observe()
.take(10)
.observeOn(Schedulers.computation());

Observable<Boolean> zipped = Observable.zip(o1, o2, new Func2<HystrixCommandMetrics.HealthCounts, HystrixCommandMetrics.HealthCounts, Boolean>() {
@Override
public Boolean call(HystrixCommandMetrics.HealthCounts healthCounts, HystrixCommandMetrics.HealthCounts healthCounts2) {
return healthCounts == healthCounts2; //we want object equality
}
});
Observable < Boolean > reduced = zipped.reduce(true, new Func2<Boolean, Boolean, Boolean>() {
@Override
public Boolean call(Boolean a, Boolean b) {
return a && b;
}
});

reduced.subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Reduced OnCompleted");
latch.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Reduced OnError : " + e);
e.printStackTrace();
latch.countDown();
}

@Override
public void onNext(Boolean b) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Reduced OnNext : " + b);
allEqual.set(b);
}
});

for (int i = 0; i < 10; i++) {
HystrixCommand<Integer> cmd = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS, 20);
cmd.execute();
}

assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
assertTrue(allEqual.get());
//we should be getting the same object from both streams. this ensures that multiple subscribers don't induce extra work
}

@Test
public void testTwoSubscribersOneUnsubscribes() throws Exception {
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("CMD-Health-O");
stream = HealthCountsStream.getInstance(key, 10, 100);

final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
final AtomicInteger healthCounts1 = new AtomicInteger(0);
final AtomicInteger healthCounts2 = new AtomicInteger(0);

Subscription s1 = stream
.observe()
.take(10)
.observeOn(Schedulers.computation())
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
latch1.countDown();
}
})
.subscribe(new Subscriber<HystrixCommandMetrics.HealthCounts>() {
@Override
public void onCompleted() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Health 1 OnCompleted");
latch1.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Health 1 OnError : " + e);
latch1.countDown();
}

@Override
public void onNext(HystrixCommandMetrics.HealthCounts healthCounts) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Health 1 OnNext : " + healthCounts);
healthCounts1.incrementAndGet();
}
});

Subscription s2 = stream
.observe()
.take(10)
.observeOn(Schedulers.computation())
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
latch2.countDown();
}
})
.subscribe(new Subscriber<HystrixCommandMetrics.HealthCounts>() {
@Override
public void onCompleted() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Health 2 OnCompleted");
latch2.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Health 2 OnError : " + e);
latch2.countDown();
}

@Override
public void onNext(HystrixCommandMetrics.HealthCounts healthCounts) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Health 2 OnNext : " + healthCounts + " : " + healthCounts2.get());
healthCounts2.incrementAndGet();
}
});
//execute 5 commands, then unsubscribe from first stream. then execute the rest
for (int i = 0; i < 10; i++) {
HystrixCommand<Integer> cmd = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS, 20);
cmd.execute();
if (i == 5) {
s1.unsubscribe();
}
}
assertTrue(stream.isSourceCurrentlySubscribed()); //only 1/2 subscriptions has been cancelled

assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS));
assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS));
System.out.println("s1 got : " + healthCounts1.get() + ", s2 got : " + healthCounts2.get());
assertTrue("s1 got data", healthCounts1.get() > 0);
assertTrue("s2 got data", healthCounts2.get() > 0);
assertTrue("s1 got less data than s2", healthCounts2.get() > healthCounts1.get());
}
}

0 comments on commit 28f3042

Please sign in to comment.