diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/BasePayloadSupplier.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/BasePayloadSupplier.java new file mode 100644 index 000000000..7d82215f3 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/BasePayloadSupplier.java @@ -0,0 +1,19 @@ +package com.netflix.hystrix.contrib.reactivesocket; + +import com.fasterxml.jackson.dataformat.cbor.CBORFactory; +import io.reactivesocket.Payload; +import rx.Observable; +import rx.subjects.BehaviorSubject; + +import java.util.function.Supplier; + +public abstract class BasePayloadSupplier implements Supplier> { + protected final CBORFactory jsonFactory; + + protected final BehaviorSubject subject; + + protected BasePayloadSupplier() { + this.jsonFactory = new CBORFactory(); + this.subject = BehaviorSubject.create(); + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java index 48a964edf..35b90e86d 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java @@ -9,8 +9,9 @@ import rx.RxReactiveStreams; /** - * An implementation of {@link RequestHandler} that provides a Hystrix Stream. Takes an integer which corresponds to - * an id in {@link EventStreamEnum}. If the id is found it will be again stream the events to the subscriber. + * An implementation of {@link RequestHandler} that provides a Hystrix Stream. Takes an 32-bit integer in the {@link Payload} + * data of a ReactiveSocket {@link io.reactivesocket.Frame} which corresponds to an id in {@link EventStreamEnum}. If + * the id is found it will begin to stream the events to the subscriber. */ public class EventStreamRequestHandler extends RequestHandler { private static final Logger logger = LoggerFactory.getLogger(EventStreamRequestHandler.class); diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java index 73b3805c2..807e2d50f 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java @@ -1,6 +1,5 @@ package com.netflix.hystrix.contrib.reactivesocket; -import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import io.reactivesocket.Frame; import io.reactivesocket.Payload; @@ -9,25 +8,17 @@ import rx.Observable; import rx.functions.Func0; import rx.schedulers.Schedulers; -import rx.subjects.BehaviorSubject; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import java.util.stream.Stream; -public abstract class StreamingSupplier implements Supplier> { +public abstract class StreamingSupplier extends BasePayloadSupplier { protected Logger logger = LoggerFactory.getLogger(StreamingSupplier.class); - protected BehaviorSubject subject; - - protected final JsonFactory jsonFactory; - protected StreamingSupplier() { - subject = BehaviorSubject.create(); - jsonFactory = new JsonFactory(); Observable .interval(500, TimeUnit.MILLISECONDS, Schedulers.computation()) diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java index 9d47f45de..7cf85a3d8 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java @@ -1,34 +1,27 @@ package com.netflix.hystrix.contrib.reactivesocket.requests; -import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.netflix.hystrix.ExecutionResult; import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier; import com.netflix.hystrix.metric.HystrixRequestEvents; import io.reactivesocket.Frame; import io.reactivesocket.Payload; import org.agrona.LangUtil; import rx.Observable; import rx.schedulers.Schedulers; -import rx.subjects.BehaviorSubject; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import java.util.function.Supplier; -public class HystrixRequestEventsStream implements Supplier> { +public class HystrixRequestEventsStream extends BasePayloadSupplier { private static HystrixRequestEventsStream INSTANCE = new HystrixRequestEventsStream(); - private BehaviorSubject subject; - - private JsonFactory jsonFactory; - private HystrixRequestEventsStream() { - this.jsonFactory = new JsonFactory(); - this.subject = BehaviorSubject.create(); + super(); com.netflix.hystrix.metric.HystrixRequestEventsStream.getInstance() .observe() diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java index 20d1c70e5..64f780f3d 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java @@ -1,6 +1,5 @@ package com.netflix.hystrix.contrib.reactivesocket.sample; -import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.netflix.hystrix.HystrixCollapserKey; import com.netflix.hystrix.HystrixCommandKey; @@ -10,28 +9,22 @@ import com.netflix.hystrix.config.HystrixConfiguration; import com.netflix.hystrix.config.HystrixConfigurationStream; import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; +import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier; import io.reactivesocket.Frame; import io.reactivesocket.Payload; import org.agrona.LangUtil; import rx.Observable; -import rx.subjects.BehaviorSubject; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; -import java.util.function.Supplier; -public class HystrixConfigStream implements Supplier> { +public class HystrixConfigStream extends BasePayloadSupplier { private static final HystrixConfigStream INSTANCE = new HystrixConfigStream(); - private BehaviorSubject subject; - - private JsonFactory jsonFactory; - private HystrixConfigStream() { - this.subject = BehaviorSubject.create(); - this.jsonFactory = new JsonFactory(); + super(); HystrixConfigurationStream stream = new HystrixConfigurationStream(100); stream diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java index 745ca1a7d..b5f9257da 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java @@ -1,9 +1,9 @@ package com.netflix.hystrix.contrib.reactivesocket.sample; -import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier; import com.netflix.hystrix.metric.sample.HystrixCommandUtilization; import com.netflix.hystrix.metric.sample.HystrixThreadPoolUtilization; import com.netflix.hystrix.metric.sample.HystrixUtilization; @@ -11,24 +11,17 @@ import io.reactivesocket.Payload; import org.agrona.LangUtil; import rx.Observable; -import rx.subjects.BehaviorSubject; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; -import java.util.function.Supplier; -public class HystrixUtilizationStream implements Supplier> { +public class HystrixUtilizationStream extends BasePayloadSupplier { private static final HystrixUtilizationStream INSTANCE = new HystrixUtilizationStream(); - private BehaviorSubject subject; - - private JsonFactory jsonFactory; - private HystrixUtilizationStream() { - this.subject = BehaviorSubject.create(); - this.jsonFactory = new JsonFactory(); + super(); com.netflix.hystrix.metric.sample.HystrixUtilizationStream stream = new com.netflix.hystrix.metric.sample.HystrixUtilizationStream(100);