Skip to content

Commit

Permalink
updated gradle
Browse files Browse the repository at this point in the history
  • Loading branch information
robertroeser committed May 18, 2016
1 parent f6e1eb8 commit 3bf3126
Show file tree
Hide file tree
Showing 9 changed files with 488 additions and 147 deletions.
22 changes: 22 additions & 0 deletions hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
repositories {
mavenCentral()
jcenter()
maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' }
}


dependencies {
compile project(':hystrix-core')

compile 'io.reactivex:rxjava-reactive-streams:latest.release'

compile 'com.fasterxml.jackson.core:jackson-core:latest.release'
compile 'com.fasterxml.jackson.core:jackson-databind:latest.release'
compile 'com.fasterxml.jackson.core:jackson-annotations:latest.release'
compile 'com.fasterxml.jackson.module:jackson-module-afterburner:latest.release'
compile 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:latest.release'
compile 'io.reactivesocket:reactivesocket:latest.release'

testCompile 'junit:junit-dep:4.10'
testCompile 'org.mockito:mockito-all:1.9.5'
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,62 @@
package com.netflix.hystrix.contrib.reactivesocket;


public enum EventStreamEnum {
import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCollasperMetricsStream;
import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCommandMetricsStream;
import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixThreadPoolMetricsStream;
import com.netflix.hystrix.contrib.reactivesocket.requests.HystrixRequestEventsStream;
import com.netflix.hystrix.contrib.reactivesocket.sample.HystrixConfigStream;
import com.netflix.hystrix.contrib.reactivesocket.sample.HystrixUtilizationStream;
import io.reactivesocket.Payload;
import rx.Observable;

import java.util.Arrays;
import java.util.function.Supplier;

public enum EventStreamEnum implements Supplier<Observable<Payload>> {

CONFIG_STREAM(1) {
@Override
public Observable<Payload> get() {
return HystrixConfigStream.getInstance().get();
}
},
REQUEST_EVENT_STREAM(2) {
@Override
public Observable<Payload> get() {
return HystrixRequestEventsStream.getInstance().get();
}
},
UTILIZATION_EVENT_STREAM(3) {
@Override
public Observable<Payload> get() {
return HystrixUtilizationStream.getInstance().get();
}
},
METRICS_STREAM(4) {
@Override
public Observable<Payload> get() {
return Observable.merge(
HystrixCommandMetricsStream.getInstance().get(),
HystrixThreadPoolMetricsStream.getInstance().get(),
HystrixCollasperMetricsStream.getInstance().get());
}
}

;

private int typeId;

EventStreamEnum(int typeId) {
this.typeId = typeId;
}

public static EventStreamEnum findByTypeId(int typeId) {
return Arrays
.asList(EventStreamEnum.findByTypeId(typeId))
.stream()
.filter(t -> t.typeId == typeId)
.findAny()
.orElseThrow(() -> new IllegalStateException("no type id found for id => " + typeId));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,63 @@
package com.netflix.hystrix.contrib.reactivesocket;

import io.reactivesocket.Payload;
import io.reactivesocket.RequestHandler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.RxReactiveStreams;

/**
* Created by rroeser on 5/17/16.
* An implementation of {@link RequestHandler} that provides a Hystrix Stream. Takes an integer which corresponds to
* an id in {@link EventStreamEnum}. If the id is found it will be again stream the events to the subscriber.
*/
public class EventStreamRequestHandler {
public class EventStreamRequestHandler extends RequestHandler {
private static final Logger logger = LoggerFactory.getLogger(EventStreamRequestHandler.class);

@Override
public Publisher<Payload> handleRequestResponse(Payload payload) {
return NO_REQUEST_RESPONSE_HANDLER.apply(payload);
}

@Override
public Publisher<Payload> handleRequestStream(Payload payload) {
return NO_REQUEST_STREAM_HANDLER.apply(payload);
}

@Override
public Publisher<Payload> handleSubscription(Payload payload) {
Observable<Payload> defer = Observable
.defer(() -> {
try {
int typeId = payload
.getData()
.getInt(0);

EventStreamEnum eventStreamEnum = EventStreamEnum.findByTypeId(typeId);
return eventStreamEnum
.get();
} catch (Throwable t) {
logger.error(t.getMessage(), t);
return Observable.error(t);
}
});

return RxReactiveStreams.toPublisher(defer);
}

@Override
public Publisher<Void> handleFireAndForget(Payload payload) {
return NO_FIRE_AND_FORGET_HANDLER.apply(payload);
}

@Override
public Publisher<Payload> handleChannel(Payload initialPayload, Publisher<Payload> inputs) {
return NO_REQUEST_CHANNEL_HANDLER.apply(inputs);
}

@Override
public Publisher<Void> handleMetadataPush(Payload payload) {
return NO_METADATA_PUSH_HANDLER.apply(payload);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,77 @@
package com.netflix.hystrix.contrib.reactivesocket;

/**
* Created by rroeser on 5/18/16.
*/
public class StreamingSupplier {
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import io.reactivesocket.Frame;
import io.reactivesocket.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func0;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;

public abstract class StreamingSupplier<T> implements Supplier<Observable<Payload>> {

protected Logger logger = LoggerFactory.getLogger(StreamingSupplier.class);

protected BehaviorSubject<Payload> subject;

protected final JsonFactory jsonFactory;

protected StreamingSupplier() {
subject = BehaviorSubject.create();
jsonFactory = new JsonFactory();

Observable
.interval(500, TimeUnit.MILLISECONDS, Schedulers.computation())
.doOnNext(i ->
getStream()
.filter(this::filter)
.map(this::getPayloadData)
.forEach(b -> {
Payload p = new Payload() {
@Override
public ByteBuffer getData() {
return ByteBuffer.wrap(b);
}

@Override
public ByteBuffer getMetadata() {
return Frame.NULL_BYTEBUFFER;
}
};

subject.onNext(p);
})
);
}

public boolean filter(T t) {
return true;
}

@Override
public Observable<Payload> get() {
return subject;
}

protected abstract Stream<T> getStream();

protected abstract byte[] getPayloadData(T t);

protected void safelyWriteNumberField(JsonGenerator json, String name, Func0<Long> metricGenerator) throws IOException {
try {
json.writeNumberField(name, metricGenerator.call());
} catch (NoSuchFieldError error) {
logger.error("While publishing Hystrix metrics stream, error looking up eventType for : " + name + ". Please check that all Hystrix versions are the same!");
json.writeNumberField(name, 0L);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,3 @@ public Long call() {
}

}
}
Loading

0 comments on commit 3bf3126

Please sign in to comment.