From 206d2a2803128bb74086ff4bfa86546992a6c049 Mon Sep 17 00:00:00 2001 From: Robert Roeser Date: Thu, 19 May 2016 21:19:29 -0700 Subject: [PATCH] added test and enum findby id --- gradle/wrapper/gradle-wrapper.properties | 4 +- .../build.gradle | 2 + .../reactivesocket/EventStreamEnum.java | 6 +- .../EventStreamRequestHandler.java | 6 +- .../reactivesocket/StreamingSupplier.java | 4 +- .../reactivesocket/EventStreamEnumTest.java | 8 ++ .../EventStreamRequestHandlerTest.java | 111 ++++++++++++++++++ .../HystrixCollasperMetricsStreamTest.java | 49 ++++++++ .../HystrixCommandMetricsStreamTest.java | 51 ++++++++ .../sample/HystrixConfigStreamTest.java | 50 ++++++++ 10 files changed, 285 insertions(+), 6 deletions(-) create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnumTest.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStreamTest.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java create mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 5fd3d2d5e..43c4dcbe3 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Wed Dec 02 15:47:21 PST 2015 +#Thu May 19 16:56:49 PDT 2016 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-all.zip diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle b/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle index b8baf507c..44a2d8e63 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle @@ -4,6 +4,8 @@ repositories { maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' } } +sourceCompatibility = JavaVersion.VERSION_1_8 +targetCompatibility = JavaVersion.VERSION_1_8 dependencies { compile project(':hystrix-core') diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java index 8658fd3ad..79611a70a 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java @@ -53,10 +53,14 @@ public Observable get() { public static EventStreamEnum findByTypeId(int typeId) { return Arrays - .asList(EventStreamEnum.findByTypeId(typeId)) + .asList(EventStreamEnum.values()) .stream() .filter(t -> t.typeId == typeId) .findAny() .orElseThrow(() -> new IllegalStateException("no type id found for id => " + typeId)); } + + public int getTypeId() { + return typeId; + } } diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java index 0e78ee1de..48a964edf 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java @@ -41,9 +41,11 @@ public Publisher handleSubscription(Payload payload) { logger.error(t.getMessage(), t); return Observable.error(t); } - }); + }) + .onBackpressureDrop(); - return RxReactiveStreams.toPublisher(defer); + return RxReactiveStreams + .toPublisher(defer); } @Override diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java index 904bd19bf..73b3805c2 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java @@ -50,7 +50,9 @@ public ByteBuffer getMetadata() { subject.onNext(p); }) - ); + ) + .retry() + .subscribe(); } public boolean filter(T t) { diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnumTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnumTest.java new file mode 100644 index 000000000..6a975f366 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnumTest.java @@ -0,0 +1,8 @@ +package com.netflix.hystrix.contrib.reactivesocket; + + +public class EventStreamEnumTest { + public void test() { + + } +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java new file mode 100644 index 000000000..fc1667943 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java @@ -0,0 +1,111 @@ +package com.netflix.hystrix.contrib.reactivesocket; + + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; +import org.agrona.BitUtil; +import org.junit.Assert; +import org.junit.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import rx.schedulers.Schedulers; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class EventStreamRequestHandlerTest { + @Test + public void testEventStream() throws Exception { + Payload payload = new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer + .allocate(BitUtil.SIZE_OF_INT) + .putInt(EventStreamEnum.METRICS_STREAM.getTypeId()); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }; + + Schedulers + .io() + .createWorker() + .schedulePeriodically(() -> { + TestCommand testCommand = new TestCommand(); + testCommand.execute(); + }, 0, 1, TimeUnit.MILLISECONDS); + + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch1 = new CountDownLatch(5); + CountDownLatch latch2 = new CountDownLatch(15); + + AtomicReference subscriptionAtomicReference = new AtomicReference<>(); + + EventStreamRequestHandler handler = new EventStreamRequestHandler(); + Publisher payloadPublisher = handler.handleSubscription(payload); + + payloadPublisher + .subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + subscriptionAtomicReference.set(s); + latch.countDown(); + } + + @Override + public void onNext(Payload payload) { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + + latch1.countDown(); + latch2.countDown(); + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onComplete() { + + } + }); + + latch.await(); + + Subscription subscription = subscriptionAtomicReference.get(); + subscription.request(5); + + latch1.await(); + + long count = latch2.getCount(); + Assert.assertTrue(count < 15); + + subscription.request(100); + + latch2.await(); + + } + + class TestCommand extends HystrixCommand { + protected TestCommand() { + super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); + } + + @Override + protected Boolean run() throws Exception { + return true; + } + } +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStreamTest.java new file mode 100644 index 000000000..7071ac6ec --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollasperMetricsStreamTest.java @@ -0,0 +1,49 @@ +package com.netflix.hystrix.contrib.reactivesocket.metrics; + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; + +public class HystrixCollasperMetricsStreamTest { + + @Test + public void test() throws Exception { + CountDownLatch latch = new CountDownLatch(21); + HystrixCommandMetricsStream + .getInstance() + .get() + .subscribe(payload -> { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + latch.countDown(); + }); + + + for (int i = 0; i < 20; i++) { + TestCommand test = new TestCommand(); + + test.execute(); + latch.countDown(); + } + + latch.await(); + } + + class TestCommand extends HystrixCommand { + protected TestCommand() { + super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); + } + + @Override + protected Boolean run() throws Exception { + return true; + } + } + + +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java new file mode 100644 index 000000000..7cbe344f5 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java @@ -0,0 +1,51 @@ +package com.netflix.hystrix.contrib.reactivesocket.metrics; + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; + +/** + * Created by rroeser on 5/19/16. + */ +public class HystrixCommandMetricsStreamTest { + @Test + public void test() throws Exception { + CountDownLatch latch = new CountDownLatch(23); + HystrixCommandMetricsStream + .getInstance() + .get() + .subscribe(payload -> { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + latch.countDown(); + }); + + for (int i = 0; i < 20; i++) { + TestCommand test = new TestCommand(latch); + + test.execute(); + } + + latch.await(); + } + + class TestCommand extends HystrixCommand { + CountDownLatch latch; + protected TestCommand(CountDownLatch latch) { + super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); + this.latch = latch; + } + + @Override + protected Boolean run() throws Exception { + latch.countDown(); + return true; + } + } + +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java new file mode 100644 index 000000000..5212bab00 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java @@ -0,0 +1,50 @@ +package com.netflix.hystrix.contrib.reactivesocket.sample; + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCommandMetricsStream; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; + +/** + * Created by rroeser on 5/19/16. + */ +public class HystrixConfigStreamTest { + @Test + public void test() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + HystrixCommandMetricsStream + .getInstance() + .get() + .subscribe(payload -> { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); + + System.out.println(s); + latch.countDown(); + }); + + + for (int i = 0; i < 20; i++) { + TestCommand test = new TestCommand(); + + test.execute(); + } + + latch.await(); + } + + class TestCommand extends HystrixCommand { + protected TestCommand() { + super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); + } + + @Override + protected Boolean run() throws Exception { + System.out.println("IM A HYSTRIX COMMAND!!!!!"); + return true; + } + } +} \ No newline at end of file