Skip to content

Commit

Permalink
Merge pull request Netflix#1304 from mattrjacobs/multicast-metrics-st…
Browse files Browse the repository at this point in the history
…reams

Make sure that metrics streams are multicast
  • Loading branch information
mattrjacobs authored Aug 3, 2016
2 parents 079fd7c + 28f3042 commit 0e22141
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
import com.netflix.hystrix.metric.HystrixEvent;
import com.netflix.hystrix.metric.HystrixEventStream;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func2;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* Refinement of {@link BucketedCounterStream} which accumulates counters infinitely in the bucket-reduction step
*
Expand All @@ -28,19 +31,35 @@
* @param <Output> type of data emitted to stream subscribers (often is the same as A but does not have to be)
*/
public abstract class BucketedCumulativeCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
private Func2<Output, Bucket, Output> reduceBucket;
private Observable<Output> sourceStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
Func2<Bucket, Event, Bucket> reduceCommandCompletion,
Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
this.reduceBucket = reduceBucket;

this.sourceStream = bucketedStream
.scan(getEmptyOutputValue(), reduceBucket)
.skip(numBuckets)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}

@Override
public Observable<Output> observe() {
return bucketedStream
.scan(getEmptyOutputValue(), reduceBucket)
.skip(numBuckets);
return sourceStream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
import com.netflix.hystrix.metric.HystrixEvent;
import com.netflix.hystrix.metric.HystrixEventStream;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* Refinement of {@link BucketedCounterStream} which reduces numBuckets at a time.
*
Expand All @@ -29,24 +32,44 @@
* @param <Output> type of data emitted to stream subscribers (often is the same as A but does not have to be)
*/
public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
private final Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary;
private Observable<Output> sourceStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
final Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
this.reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
@Override
public Observable<Output> call(Observable<Bucket> window) {
return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
}
};
this.sourceStream = bucketedStream //stream broken up into buckets
.window(numBuckets, 1) //emit overlapping windows of buckets
.flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share() //multiple subscribers should get same data
.onBackpressureDrop(); //if there are slow consumers, data should not buffer
}

@Override
public Observable<Output> observe() {
return bucketedStream //stream broken up into buckets
.window(numBuckets, 1) //emit overlapping windows of buckets
.flatMap(reduceWindowToSummary); //convert a window of bucket-summaries into a single summary
return sourceStream;
}

/* package-private */ boolean isSourceCurrentlySubscribed() {
return isSourceCurrentlySubscribed.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,22 @@ public Integer call(HystrixCommandExecutionStarted event) {
}
};



protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
for (int i = 0; i < numBuckets; i++) {
emptyRollingMaxBuckets.add(0);
}

rollingMaxStream = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
Observable<Integer> eventConcurrencyStream = inputEventStream
.observe()
.map(getConcurrencyCountFromEvent);

return eventConcurrencyStream
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.flatMap(reduceStreamToMax)
.startWith(emptyRollingMaxBuckets)
.window(numBuckets, 1)
.flatMap(reduceStreamToMax);
}
}).share();
rollingMaxStream = inputEventStream
.observe()
.map(getConcurrencyCountFromEvent)
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.flatMap(reduceStreamToMax)
.startWith(emptyRollingMaxBuckets)
.window(numBuckets, 1)
.flatMap(reduceStreamToMax)
.share()
.onBackpressureDrop();
}

public void startCachingStreamValuesIfUnstarted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,16 @@ public Observable<Histogram> call(Observable<Event> bucket) {
}
};

rollingDistributionStream = Observable.defer(new Func0<Observable<CachedValuesHistogram>>() {
@Override
public Observable<CachedValuesHistogram> call() {
return stream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
.flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
.startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
.window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
.flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
.map(cacheHistogramValues); //convert to CachedValueHistogram (commonly-accessed values are cached)
}
}).share(); //multicast
rollingDistributionStream = stream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
.flatMap(reduceBucketToSingleDistribution) //stream of aggregated Histograms
.startWith(emptyDistributionsToStart) //stream of aggregated Histograms that starts with n empty
.window(numBuckets, 1) //windowed stream: each OnNext is a stream of n Histograms
.flatMap(reduceWindowToSingleDistribution) //reduced stream: each OnNext is a single Histogram
.map(cacheHistogramValues) //convert to CachedValueHistogram (commonly-accessed values are cached)
.share()
.onBackpressureDrop();
}

public Observable<CachedValuesHistogram> observe() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.hystrix.metric.consumer;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandMetrics;
Expand All @@ -27,12 +28,19 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -462,4 +470,153 @@ public void testMultipleEventsOverTimeGetStoredAndAgeOut() {
assertEquals(0L, stream.getLatest().getErrorCount());
assertEquals(0L, stream.getLatest().getTotalRequests());
}

@Test
public void testSharedSourceStream() throws InterruptedException {
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("CMD-Health-N");
stream = HealthCountsStream.getInstance(key, 10, 100);

final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean allEqual = new AtomicBoolean(false);

Observable<HystrixCommandMetrics.HealthCounts> o1 = stream
.observe()
.take(10)
.observeOn(Schedulers.computation());

Observable<HystrixCommandMetrics.HealthCounts> o2 = stream
.observe()
.take(10)
.observeOn(Schedulers.computation());

Observable<Boolean> zipped = Observable.zip(o1, o2, new Func2<HystrixCommandMetrics.HealthCounts, HystrixCommandMetrics.HealthCounts, Boolean>() {
@Override
public Boolean call(HystrixCommandMetrics.HealthCounts healthCounts, HystrixCommandMetrics.HealthCounts healthCounts2) {
return healthCounts == healthCounts2; //we want object equality
}
});
Observable < Boolean > reduced = zipped.reduce(true, new Func2<Boolean, Boolean, Boolean>() {
@Override
public Boolean call(Boolean a, Boolean b) {
return a && b;
}
});

reduced.subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Reduced OnCompleted");
latch.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Reduced OnError : " + e);
e.printStackTrace();
latch.countDown();
}

@Override
public void onNext(Boolean b) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Reduced OnNext : " + b);
allEqual.set(b);
}
});

for (int i = 0; i < 10; i++) {
HystrixCommand<Integer> cmd = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS, 20);
cmd.execute();
}

assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
assertTrue(allEqual.get());
//we should be getting the same object from both streams. this ensures that multiple subscribers don't induce extra work
}

@Test
public void testTwoSubscribersOneUnsubscribes() throws Exception {
HystrixCommandKey key = HystrixCommandKey.Factory.asKey("CMD-Health-O");
stream = HealthCountsStream.getInstance(key, 10, 100);

final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
final AtomicInteger healthCounts1 = new AtomicInteger(0);
final AtomicInteger healthCounts2 = new AtomicInteger(0);

Subscription s1 = stream
.observe()
.take(10)
.observeOn(Schedulers.computation())
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
latch1.countDown();
}
})
.subscribe(new Subscriber<HystrixCommandMetrics.HealthCounts>() {
@Override
public void onCompleted() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Health 1 OnCompleted");
latch1.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Health 1 OnError : " + e);
latch1.countDown();
}

@Override
public void onNext(HystrixCommandMetrics.HealthCounts healthCounts) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Health 1 OnNext : " + healthCounts);
healthCounts1.incrementAndGet();
}
});

Subscription s2 = stream
.observe()
.take(10)
.observeOn(Schedulers.computation())
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
latch2.countDown();
}
})
.subscribe(new Subscriber<HystrixCommandMetrics.HealthCounts>() {
@Override
public void onCompleted() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Health 2 OnCompleted");
latch2.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Health 2 OnError : " + e);
latch2.countDown();
}

@Override
public void onNext(HystrixCommandMetrics.HealthCounts healthCounts) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Health 2 OnNext : " + healthCounts + " : " + healthCounts2.get());
healthCounts2.incrementAndGet();
}
});
//execute 5 commands, then unsubscribe from first stream. then execute the rest
for (int i = 0; i < 10; i++) {
HystrixCommand<Integer> cmd = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS, 20);
cmd.execute();
if (i == 5) {
s1.unsubscribe();
}
}
assertTrue(stream.isSourceCurrentlySubscribed()); //only 1/2 subscriptions has been cancelled

assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS));
assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS));
System.out.println("s1 got : " + healthCounts1.get() + ", s2 got : " + healthCounts2.get());
assertTrue("s1 got data", healthCounts1.get() > 0);
assertTrue("s2 got data", healthCounts2.get() > 0);
assertTrue("s1 got less data than s2", healthCounts2.get() > healthCounts1.get());
}
}

0 comments on commit 0e22141

Please sign in to comment.