Skip to content

Commit

Permalink
switched to cbor and added a base class
Browse files Browse the repository at this point in the history
  • Loading branch information
robertroeser committed May 23, 2016
1 parent bb8f153 commit bdfd8f8
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.netflix.hystrix.contrib.reactivesocket;

import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
import io.reactivesocket.Payload;
import rx.Observable;
import rx.subjects.BehaviorSubject;

import java.util.function.Supplier;

public abstract class BasePayloadSupplier implements Supplier<Observable<Payload>> {
protected final CBORFactory jsonFactory;

protected final BehaviorSubject<Payload> subject;

protected BasePayloadSupplier() {
this.jsonFactory = new CBORFactory();
this.subject = BehaviorSubject.create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
import rx.RxReactiveStreams;

/**
* An implementation of {@link RequestHandler} that provides a Hystrix Stream. Takes an integer which corresponds to
* an id in {@link EventStreamEnum}. If the id is found it will be again stream the events to the subscriber.
* An implementation of {@link RequestHandler} that provides a Hystrix Stream. Takes an 32-bit integer in the {@link Payload}
* data of a ReactiveSocket {@link io.reactivesocket.Frame} which corresponds to an id in {@link EventStreamEnum}. If
* the id is found it will begin to stream the events to the subscriber.
*/
public class EventStreamRequestHandler extends RequestHandler {
private static final Logger logger = LoggerFactory.getLogger(EventStreamRequestHandler.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.netflix.hystrix.contrib.reactivesocket;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import io.reactivesocket.Frame;
import io.reactivesocket.Payload;
Expand All @@ -9,25 +8,17 @@
import rx.Observable;
import rx.functions.Func0;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;

public abstract class StreamingSupplier<T> implements Supplier<Observable<Payload>> {
public abstract class StreamingSupplier<T> extends BasePayloadSupplier {

protected Logger logger = LoggerFactory.getLogger(StreamingSupplier.class);

protected BehaviorSubject<Payload> subject;

protected final JsonFactory jsonFactory;

protected StreamingSupplier() {
subject = BehaviorSubject.create();
jsonFactory = new JsonFactory();

Observable
.interval(500, TimeUnit.MILLISECONDS, Schedulers.computation())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,27 @@
package com.netflix.hystrix.contrib.reactivesocket.requests;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.netflix.hystrix.ExecutionResult;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier;
import com.netflix.hystrix.metric.HystrixRequestEvents;
import io.reactivesocket.Frame;
import io.reactivesocket.Payload;
import org.agrona.LangUtil;
import rx.Observable;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

public class HystrixRequestEventsStream implements Supplier<Observable<Payload>> {
public class HystrixRequestEventsStream extends BasePayloadSupplier {
private static HystrixRequestEventsStream INSTANCE = new HystrixRequestEventsStream();

private BehaviorSubject<Payload> subject;

private JsonFactory jsonFactory;

private HystrixRequestEventsStream() {
this.jsonFactory = new JsonFactory();
this.subject = BehaviorSubject.create();
super();

com.netflix.hystrix.metric.HystrixRequestEventsStream.getInstance()
.observe()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.netflix.hystrix.contrib.reactivesocket.sample;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCommandKey;
Expand All @@ -10,28 +9,22 @@
import com.netflix.hystrix.config.HystrixConfiguration;
import com.netflix.hystrix.config.HystrixConfigurationStream;
import com.netflix.hystrix.config.HystrixThreadPoolConfiguration;
import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier;
import io.reactivesocket.Frame;
import io.reactivesocket.Payload;
import org.agrona.LangUtil;
import rx.Observable;
import rx.subjects.BehaviorSubject;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.function.Supplier;

public class HystrixConfigStream implements Supplier<Observable<Payload>> {
public class HystrixConfigStream extends BasePayloadSupplier {
private static final HystrixConfigStream INSTANCE = new HystrixConfigStream();

private BehaviorSubject<Payload> subject;

private JsonFactory jsonFactory;

private HystrixConfigStream() {
this.subject = BehaviorSubject.create();
this.jsonFactory = new JsonFactory();
super();

HystrixConfigurationStream stream = new HystrixConfigurationStream(100);
stream
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,27 @@
package com.netflix.hystrix.contrib.reactivesocket.sample;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier;
import com.netflix.hystrix.metric.sample.HystrixCommandUtilization;
import com.netflix.hystrix.metric.sample.HystrixThreadPoolUtilization;
import com.netflix.hystrix.metric.sample.HystrixUtilization;
import io.reactivesocket.Frame;
import io.reactivesocket.Payload;
import org.agrona.LangUtil;
import rx.Observable;
import rx.subjects.BehaviorSubject;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.function.Supplier;

public class HystrixUtilizationStream implements Supplier<Observable<Payload>> {
public class HystrixUtilizationStream extends BasePayloadSupplier {
private static final HystrixUtilizationStream INSTANCE = new HystrixUtilizationStream();

private BehaviorSubject<Payload> subject;

private JsonFactory jsonFactory;

private HystrixUtilizationStream() {
this.subject = BehaviorSubject.create();
this.jsonFactory = new JsonFactory();
super();

com.netflix.hystrix.metric.sample.HystrixUtilizationStream stream
= new com.netflix.hystrix.metric.sample.HystrixUtilizationStream(100);
Expand Down

0 comments on commit bdfd8f8

Please sign in to comment.