Skip to content

Commit

Permalink
Merge pull request Netflix#1246 from mattrjacobs/move-dashboard-strea…
Browse files Browse the repository at this point in the history
…m-to-hystrix-core

Move HystrixDashboardStream to hystrix-core
  • Loading branch information
mattrjacobs authored Jun 14, 2016
2 parents 25da664 + 669ee21 commit 92d8659
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixRequestEvents;
import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixUtilization;
import com.netflix.hystrix.metric.HystrixRequestEventsStream;
import com.netflix.hystrix.metric.consumer.HystrixDashboardStream;
import com.netflix.hystrix.metric.sample.HystrixUtilizationStream;
import io.reactivesocket.Payload;
import rx.Observable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolMetrics;
import com.netflix.hystrix.contrib.reactivesocket.HystrixDashboardStream;
import com.netflix.hystrix.metric.consumer.HystrixDashboardStream;
import org.agrona.LangUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix.contrib.reactivesocket;
package com.netflix.hystrix.metric.consumer;

import com.netflix.hystrix.HystrixCollapserMetrics;
import com.netflix.hystrix.HystrixCommandMetrics;
import com.netflix.hystrix.HystrixThreadPoolMetrics;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func1;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,13 +34,28 @@ public class HystrixDashboardStream {
private HystrixDashboardStream(int delayInMs) {
this.delayInMs = delayInMs;
this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS)
.map(timestamp -> new DashboardData(
HystrixCommandMetrics.getInstances(),
HystrixThreadPoolMetrics.getInstances(),
HystrixCollapserMetrics.getInstances()
))
.doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
.doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
.map(new Func1<Long, DashboardData>() {
@Override
public DashboardData call(Long timestamp) {
return new DashboardData(
HystrixCommandMetrics.getInstances(),
HystrixThreadPoolMetrics.getInstances(),
HystrixCollapserMetrics.getInstances()
);
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix.contrib.reactivesocket;
package com.netflix.hystrix.metric.consumer;

import com.hystrix.junit.HystrixRequestContextRule;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandMetrics;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.metric.CommandStreamTest;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Actions;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

import java.util.concurrent.CountDownLatch;
Expand All @@ -34,9 +41,14 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class HystrixDashboardStreamTest extends HystrixStreamTest {
public class HystrixDashboardStreamTest extends CommandStreamTest {

@Rule
public HystrixRequestContextRule ctx = new HystrixRequestContextRule();

HystrixDashboardStream stream;
private final static HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Dashboard");
private final static HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("DashboardCommand");

@Before
public void init() {
Expand All @@ -46,26 +58,37 @@ public void init() {
@Test
public void testStreamHasData() throws Exception {
final AtomicBoolean commandShowsUp = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(1);
final int NUM = 10;

for (int i = 0; i < 2; i++) {
HystrixCommand<Integer> cmd = new SyntheticBlockingCommand();
HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
cmd.observe();
}

stream.observe().take(NUM).subscribe(dashboardData -> {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Received data with : " + dashboardData.commandMetrics.size() + " commands");
for (HystrixCommandMetrics metrics: dashboardData.commandMetrics) {
if (metrics.getCommandKey().name().equals("SyntheticBlockingCommand")) {
commandShowsUp.set(true);
stream.observe().take(NUM).subscribe(
new Subscriber<HystrixDashboardStream.DashboardData>() {
@Override
public void onCompleted() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted");
latch.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnError : " + e);
latch.countDown();
}

@Override
public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Received data with : " + dashboardData.commandMetrics.size() + " commands");
for (HystrixCommandMetrics metrics : dashboardData.commandMetrics) {
if (metrics.getCommandKey().equals(commandKey)) {
commandShowsUp.set(true);
}
}
}
},
Actions.empty(),
() -> {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted");
latch.countDown();
});

assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
Expand All @@ -74,15 +97,20 @@ public void testStreamHasData() throws Exception {

@Test
public void testTwoSubscribersOneUnsubscribes() throws Exception {
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
AtomicInteger payloads1 = new AtomicInteger(0);
AtomicInteger payloads2 = new AtomicInteger(0);
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
final AtomicInteger payloads1 = new AtomicInteger(0);
final AtomicInteger payloads2 = new AtomicInteger(0);

Subscription s1 = stream
.observe()
.take(100)
.doOnUnsubscribe(latch1::countDown)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
latch1.countDown();
}
})
.subscribe(new Subscriber<HystrixDashboardStream.DashboardData>() {
@Override
public void onCompleted() {
Expand All @@ -106,7 +134,12 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
Subscription s2 = stream
.observe()
.take(100)
.doOnUnsubscribe(latch2::countDown)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
latch2.countDown();
}
})
.subscribe(new Subscriber<HystrixDashboardStream.DashboardData>() {
@Override
public void onCompleted() {
Expand All @@ -128,7 +161,7 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
});
//execute 1 command, then unsubscribe from first stream. then execute the rest
for (int i = 0; i < 50; i++) {
HystrixCommand<Integer> cmd = new SyntheticBlockingCommand();
HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
cmd.execute();
if (i == 1) {
s1.unsubscribe();
Expand All @@ -145,15 +178,20 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {

@Test
public void testTwoSubscribersBothUnsubscribe() throws Exception {
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
AtomicInteger payloads1 = new AtomicInteger(0);
AtomicInteger payloads2 = new AtomicInteger(0);
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
final AtomicInteger payloads1 = new AtomicInteger(0);
final AtomicInteger payloads2 = new AtomicInteger(0);

Subscription s1 = stream
.observe()
.take(10)
.doOnUnsubscribe(latch1::countDown)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
latch1.countDown();
}
})
.subscribe(new Subscriber<HystrixDashboardStream.DashboardData>() {
@Override
public void onCompleted() {
Expand All @@ -177,7 +215,12 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
Subscription s2 = stream
.observe()
.take(10)
.doOnUnsubscribe(latch2::countDown)
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
latch2.countDown();
}
})
.subscribe(new Subscriber<HystrixDashboardStream.DashboardData>() {
@Override
public void onCompleted() {
Expand All @@ -199,7 +242,7 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {
});
//execute half the commands, then unsubscribe from both streams, then execute the rest
for (int i = 0; i < 50; i++) {
HystrixCommand<Integer> cmd = new SyntheticBlockingCommand();
HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
cmd.execute();
if (i == 25) {
s1.unsubscribe();
Expand All @@ -217,25 +260,33 @@ public void onNext(HystrixDashboardStream.DashboardData dashboardData) {

@Test
public void testTwoSubscribersOneSlowOneFast() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean foundError = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean foundError = new AtomicBoolean(false);

Observable<HystrixDashboardStream.DashboardData> fast = stream
.observe()
.observeOn(Schedulers.newThread());
Observable<HystrixDashboardStream.DashboardData> slow = stream
.observe()
.observeOn(Schedulers.newThread())
.map(n -> {
try {
Thread.sleep(100);
return n;
} catch (InterruptedException ex) {
return n;
.map(new Func1<HystrixDashboardStream.DashboardData, HystrixDashboardStream.DashboardData>() {
@Override
public HystrixDashboardStream.DashboardData call(HystrixDashboardStream.DashboardData n) {
try {
Thread.sleep(100);
return n;
} catch (InterruptedException ex) {
return n;
}
}
});

Observable<Boolean> checkZippedEqual = Observable.zip(fast, slow, (payload, payload2) -> payload == payload2);
Observable<Boolean> checkZippedEqual = Observable.zip(fast, slow, new Func2<HystrixDashboardStream.DashboardData, HystrixDashboardStream.DashboardData, Boolean>() {
@Override
public Boolean call(HystrixDashboardStream.DashboardData payload, HystrixDashboardStream.DashboardData payload2) {
return payload == payload2;
}
});

Subscription s1 = checkZippedEqual
.take(10000)
Expand All @@ -261,7 +312,7 @@ public void onNext(Boolean b) {
});

for (int i = 0; i < 50; i++) {
HystrixCommand<Integer> cmd = new SyntheticBlockingCommand();
HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
cmd.execute();
}

Expand Down

0 comments on commit 92d8659

Please sign in to comment.