diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle b/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle index 44a2d8e63..134136cc1 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle @@ -9,16 +9,16 @@ targetCompatibility = JavaVersion.VERSION_1_8 dependencies { compile project(':hystrix-core') - - compile 'io.reactivex:rxjava-reactive-streams:latest.release' - + compile 'com.fasterxml.jackson.core:jackson-core:latest.release' compile 'com.fasterxml.jackson.core:jackson-databind:latest.release' compile 'com.fasterxml.jackson.core:jackson-annotations:latest.release' compile 'com.fasterxml.jackson.module:jackson-module-afterburner:latest.release' compile 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:latest.release' compile 'io.reactivesocket:reactivesocket:latest.release' - + compile 'io.reactivesocket:reactivesocket-netty:latest.release' + compile 'io.reactivex:rxjava-reactive-streams:latest.release' + testCompile 'junit:junit-dep:4.10' testCompile 'org.mockito:mockito-all:1.9.5' } diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/BasePayloadSupplier.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/BasePayloadSupplier.java deleted file mode 100644 index 7d82215f3..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/BasePayloadSupplier.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.netflix.hystrix.contrib.reactivesocket; - -import com.fasterxml.jackson.dataformat.cbor.CBORFactory; -import io.reactivesocket.Payload; -import rx.Observable; -import rx.subjects.BehaviorSubject; - -import java.util.function.Supplier; - -public abstract class BasePayloadSupplier implements Supplier> { - protected final CBORFactory jsonFactory; - - protected final BehaviorSubject subject; - - protected BasePayloadSupplier() { - this.jsonFactory = new CBORFactory(); - this.subject = BehaviorSubject.create(); - } -} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java new file mode 100644 index 000000000..1a5a0907b --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java @@ -0,0 +1,92 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.reactivesocket; + +import com.netflix.hystrix.config.HystrixConfigurationStream; +import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixConfiguration; +import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixDashboardData; +import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixMetric; +import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixRequestEvents; +import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixUtilization; +import com.netflix.hystrix.metric.HystrixRequestEventsStream; +import com.netflix.hystrix.metric.sample.HystrixUtilizationStream; +import io.reactivesocket.Payload; +import rx.Observable; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +class EventStream implements Supplier> { + + private final static int CONFIGURATION_DATA_INTERVAL_IN_MS = 500; + private final static int UTILIZATION_DATA_INTERVAL_IN_MS = 500; + + private final Observable source; + private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); + + /* package-private */EventStream(Observable source) { + this.source = source + .doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true)) + .doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false)) + .share() + .onBackpressureDrop(); + } + + @Override + public Observable get() { + return source; + } + + public static EventStream getInstance(EventStreamEnum eventStreamEnum) { + final Observable source; + + switch (eventStreamEnum) { + case CONFIG_STREAM: + source = new HystrixConfigurationStream(CONFIGURATION_DATA_INTERVAL_IN_MS) + .observe() + .map(SerialHystrixConfiguration::toBytes) + .map(SerialHystrixMetric::toPayload); + break; + case REQUEST_EVENT_STREAM: + source = HystrixRequestEventsStream.getInstance() + .observe() + .map(SerialHystrixRequestEvents::toBytes) + .map(SerialHystrixMetric::toPayload); + break; + case UTILIZATION_STREAM: + source = new HystrixUtilizationStream(UTILIZATION_DATA_INTERVAL_IN_MS) + .observe() + .map(SerialHystrixUtilization::toBytes) + .map(SerialHystrixMetric::toPayload); + break; + case GENERAL_DASHBOARD_STREAM: + source = HystrixDashboardStream.getInstance() + .observe() + .map(SerialHystrixDashboardData::toBytes) + .map(SerialHystrixMetric::toPayload); + break; + default: + throw new IllegalArgumentException("Unknown EventStreamEnum : " + eventStreamEnum); + } + + return new EventStream(source); + } + + public boolean isSourceCurrentlySubscribed() { + return isSourceCurrentlySubscribed.get(); + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java index 5882e4c12..f407ddf6d 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java @@ -1,59 +1,27 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.netflix.hystrix.contrib.reactivesocket; - -import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCollapserMetricsStream; -import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCommandMetricsStream; -import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixThreadPoolMetricsStream; -import com.netflix.hystrix.contrib.reactivesocket.requests.HystrixRequestEventsStream; -import com.netflix.hystrix.contrib.reactivesocket.sample.HystrixConfigStream; -import com.netflix.hystrix.contrib.reactivesocket.sample.HystrixUtilizationStream; -import io.reactivesocket.Payload; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rx.Observable; - import java.util.Arrays; -import java.util.function.Supplier; - -public enum EventStreamEnum implements Supplier> { - - CONFIG_STREAM(1) { - @Override - public Observable get() { - logger.info("streaming config data"); - return HystrixConfigStream.getInstance().get(); - } - }, - REQUEST_EVENT_STREAM(2) { - @Override - public Observable get() { - logger.info("streaming request events"); - return HystrixRequestEventsStream.getInstance().get(); - } - }, - UTILIZATION_EVENT_STREAM(3) { - @Override - public Observable get() { - logger.info("streaming utilization events"); - return HystrixUtilizationStream.getInstance().get(); - } - }, - METRICS_STREAM(4) { - @Override - public Observable get() { - logger.info("streaming metrics"); - return Observable.merge( - HystrixCommandMetricsStream.getInstance().get(), - HystrixThreadPoolMetricsStream.getInstance().get(), - HystrixCollapserMetricsStream.getInstance().get()); - } - } - ; +public enum EventStreamEnum { - private static final Logger logger = LoggerFactory.getLogger(EventStreamEnum.class); + CONFIG_STREAM(1), REQUEST_EVENT_STREAM(2), UTILIZATION_STREAM(3), GENERAL_DASHBOARD_STREAM(4); - private int typeId; + private final int typeId; EventStreamEnum(int typeId) { this.typeId = typeId; @@ -61,11 +29,11 @@ public Observable get() { public static EventStreamEnum findByTypeId(int typeId) { return Arrays - .asList(EventStreamEnum.values()) - .stream() - .filter(t -> t.typeId == typeId) - .findAny() - .orElseThrow(() -> new IllegalStateException("no type id found for id => " + typeId)); + .asList(EventStreamEnum.values()) + .stream() + .filter(t -> t.typeId == typeId) + .findAny() + .orElseThrow(() -> new IllegalStateException("no type id found for id => " + typeId)); } public int getTypeId() { diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java index 35b90e86d..98e5b69b2 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java @@ -1,7 +1,23 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.netflix.hystrix.contrib.reactivesocket; import io.reactivesocket.Payload; import io.reactivesocket.RequestHandler; +import org.agrona.BitUtil; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,26 +34,48 @@ public class EventStreamRequestHandler extends RequestHandler { @Override public Publisher handleRequestResponse(Payload payload) { - return NO_REQUEST_RESPONSE_HANDLER.apply(payload); + Observable singleResponse = Observable.defer(() -> { + try { + int typeId = payload.getData().getInt(0); + EventStreamEnum eventStreamEnum = EventStreamEnum.findByTypeId(typeId); + EventStream eventStream = EventStream.getInstance(eventStreamEnum); + return eventStream.get().take(1); + } catch (Throwable t) { + logger.error(t.getMessage(), t); + return Observable.error(t); + } + }); + + return RxReactiveStreams.toPublisher(singleResponse); } @Override public Publisher handleRequestStream(Payload payload) { - return NO_REQUEST_STREAM_HANDLER.apply(payload); + Observable multiResponse = Observable.defer(() -> { + try { + int typeId = payload.getData().getInt(0); + int numRequested = payload.getData().getInt(BitUtil.SIZE_OF_INT); + EventStreamEnum eventStreamEnum = EventStreamEnum.findByTypeId(typeId); + EventStream eventStream = EventStream.getInstance(eventStreamEnum); + return eventStream.get().take(numRequested); + } catch (Throwable t) { + logger.error(t.getMessage(), t); + return Observable.error(t); + } + }); + + return RxReactiveStreams.toPublisher(multiResponse); } @Override public Publisher handleSubscription(Payload payload) { - Observable defer = Observable + Observable infiniteResponse = Observable .defer(() -> { try { - int typeId = payload - .getData() - .getInt(0); - + int typeId = payload.getData().getInt(0); EventStreamEnum eventStreamEnum = EventStreamEnum.findByTypeId(typeId); - return eventStreamEnum - .get(); + EventStream eventStream = EventStream.getInstance(eventStreamEnum); + return eventStream.get(); } catch (Throwable t) { logger.error(t.getMessage(), t); return Observable.error(t); @@ -45,8 +83,7 @@ public Publisher handleSubscription(Payload payload) { }) .onBackpressureDrop(); - return RxReactiveStreams - .toPublisher(defer); + return RxReactiveStreams.toPublisher(infiniteResponse); } @Override diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/HystrixDashboardStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/HystrixDashboardStream.java new file mode 100644 index 000000000..370d9be87 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/HystrixDashboardStream.java @@ -0,0 +1,92 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.reactivesocket; + +import com.netflix.hystrix.HystrixCollapserMetrics; +import com.netflix.hystrix.HystrixCommandMetrics; +import com.netflix.hystrix.HystrixThreadPoolMetrics; +import rx.Observable; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HystrixDashboardStream { + final int delayInMs; + final Observable singleSource; + final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); + + private HystrixDashboardStream(int delayInMs) { + this.delayInMs = delayInMs; + this.singleSource = Observable.interval(delayInMs, TimeUnit.MILLISECONDS) + .map(timestamp -> new DashboardData( + HystrixCommandMetrics.getInstances(), + HystrixThreadPoolMetrics.getInstances(), + HystrixCollapserMetrics.getInstances() + )) + .doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true)) + .doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false)) + .share() + .onBackpressureDrop(); + } + + private static final HystrixDashboardStream INSTANCE = new HystrixDashboardStream(500); + + public static HystrixDashboardStream getInstance() { + return INSTANCE; + } + + static HystrixDashboardStream getNonSingletonInstanceOnlyUsedInUnitTests(int delayInMs) { + return new HystrixDashboardStream(delayInMs); + } + + /** + * Return a ref-counted stream that will only do work when at least one subscriber is present + */ + public Observable observe() { + return singleSource; + } + + public boolean isSourceCurrentlySubscribed() { + return isSourceCurrentlySubscribed.get(); + } + + public static class DashboardData { + final Collection commandMetrics; + final Collection threadPoolMetrics; + final Collection collapserMetrics; + + public DashboardData(Collection commandMetrics, Collection threadPoolMetrics, Collection collapserMetrics) { + this.commandMetrics = commandMetrics; + this.threadPoolMetrics = threadPoolMetrics; + this.collapserMetrics = collapserMetrics; + } + + public Collection getCommandMetrics() { + return commandMetrics; + } + + public Collection getThreadPoolMetrics() { + return threadPoolMetrics; + } + + public Collection getCollapserMetrics() { + return collapserMetrics; + } + } +} + + diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java deleted file mode 100644 index 807e2d50f..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/StreamingSupplier.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.netflix.hystrix.contrib.reactivesocket; - -import com.fasterxml.jackson.core.JsonGenerator; -import io.reactivesocket.Frame; -import io.reactivesocket.Payload; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rx.Observable; -import rx.functions.Func0; -import rx.schedulers.Schedulers; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -public abstract class StreamingSupplier extends BasePayloadSupplier { - - protected Logger logger = LoggerFactory.getLogger(StreamingSupplier.class); - - protected StreamingSupplier() { - - Observable - .interval(500, TimeUnit.MILLISECONDS, Schedulers.computation()) - .doOnNext(i -> - getStream() - .filter(this::filter) - .map(this::getPayloadData) - .forEach(b -> { - Payload p = new Payload() { - @Override - public ByteBuffer getData() { - return ByteBuffer.wrap(b); - } - - @Override - public ByteBuffer getMetadata() { - return Frame.NULL_BYTEBUFFER; - } - }; - - subject.onNext(p); - }) - ) - .retry() - .subscribe(); - } - - public boolean filter(T t) { - return true; - } - - @Override - public Observable get() { - return subject; - } - - protected abstract Stream getStream(); - - protected abstract byte[] getPayloadData(T t); - - protected void safelyWriteNumberField(JsonGenerator json, String name, Func0 metricGenerator) throws IOException { - try { - json.writeNumberField(name, metricGenerator.call()); - } catch (NoSuchFieldError error) { - logger.error("While publishing Hystrix metrics stream, error looking up eventType for : " + name + ". Please check that all Hystrix versions are the same!"); - json.writeNumberField(name, 0L); - } - } -} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/client/HystrixMetricsReactiveSocketClient.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/client/HystrixMetricsReactiveSocketClient.java new file mode 100644 index 000000000..1efd73f6e --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/client/HystrixMetricsReactiveSocketClient.java @@ -0,0 +1,92 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.reactivesocket.client; + +import com.netflix.hystrix.contrib.reactivesocket.EventStreamEnum; +import io.netty.channel.nio.NioEventLoopGroup; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.DefaultReactiveSocket; +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.netty.tcp.client.ClientTcpDuplexConnection; +import org.agrona.BitUtil; +import org.reactivestreams.Publisher; +import rx.RxReactiveStreams; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +public class HystrixMetricsReactiveSocketClient { + + private final ReactiveSocket reactiveSocket; + + public HystrixMetricsReactiveSocketClient(String host, int port, NioEventLoopGroup eventLoopGroup) { + ClientTcpDuplexConnection duplexConnection = RxReactiveStreams.toObservable( + ClientTcpDuplexConnection.create(InetSocketAddress.createUnresolved(host, port), eventLoopGroup) + ).toBlocking().single(); + + this.reactiveSocket = DefaultReactiveSocket + .fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), Throwable::printStackTrace); + } + + public void startAndWait() { + reactiveSocket.startAndWait(); + } + + public Publisher requestResponse(EventStreamEnum eventStreamEnum) { + return reactiveSocket.requestResponse(createPayload(eventStreamEnum)); + } + + public Publisher requestStream(EventStreamEnum eventStreamEnum, int numRequested) { + return reactiveSocket.requestStream(createPayload(eventStreamEnum, numRequested)); + } + + public Publisher requestSubscription(EventStreamEnum eventStreamEnum) { + return reactiveSocket.requestSubscription(createPayload(eventStreamEnum)); + } + + private static Payload createPayload(EventStreamEnum eventStreamEnum) { + return new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer.allocate(BitUtil.SIZE_OF_INT) + .putInt(0, eventStreamEnum.getTypeId()); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }; + } + + private static Payload createPayload(EventStreamEnum eventStreamEnum, int numRequested) { + return new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer.allocate(BitUtil.SIZE_OF_INT * 2) + .putInt(0, eventStreamEnum.getTypeId()) + .putInt(BitUtil.SIZE_OF_INT, numRequested); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }; + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStream.java deleted file mode 100644 index d936c865e..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStream.java +++ /dev/null @@ -1,106 +0,0 @@ -package com.netflix.hystrix.contrib.reactivesocket.metrics; - - -import com.fasterxml.jackson.core.JsonGenerator; -import com.netflix.hystrix.HystrixCollapserKey; -import com.netflix.hystrix.HystrixCollapserMetrics; -import com.netflix.hystrix.HystrixEventType; -import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier; -import org.agrona.LangUtil; -import rx.functions.Func0; - -import java.io.ByteArrayOutputStream; -import java.util.stream.Stream; - -public class HystrixCollapserMetricsStream extends StreamingSupplier { - private static HystrixCollapserMetricsStream INSTANCE = new HystrixCollapserMetricsStream(); - - private HystrixCollapserMetricsStream() { - super(); - } - - public static HystrixCollapserMetricsStream getInstance() { - return INSTANCE; - } - - @Override - protected Stream getStream() { - return HystrixCollapserMetrics.getInstances().stream(); - } - - protected byte[] getPayloadData(final HystrixCollapserMetrics collapserMetrics) { - byte[] retVal = null; - try { - HystrixCollapserKey key = collapserMetrics.getCollapserKey(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - JsonGenerator json = jsonFactory.createGenerator(bos); - json.writeStartObject(); - - json.writeStringField("type", "HystrixCollapser"); - json.writeStringField("name", key.name()); - json.writeNumberField("currentTime", System.currentTimeMillis()); - - safelyWriteNumberField(json, "rollingCountRequestsBatched", new Func0() { - @Override - public Long call() { - return collapserMetrics.getRollingCount(HystrixEventType.Collapser.ADDED_TO_BATCH); - } - }); - safelyWriteNumberField(json, "rollingCountBatches", new Func0() { - @Override - public Long call() { - return collapserMetrics.getRollingCount(HystrixEventType.Collapser.BATCH_EXECUTED); - } - }); - safelyWriteNumberField(json, "rollingCountResponsesFromCache", new Func0() { - @Override - public Long call() { - return collapserMetrics.getRollingCount(HystrixEventType.Collapser.RESPONSE_FROM_CACHE); - } - }); - - // batch size percentiles - json.writeNumberField("batchSize_mean", collapserMetrics.getBatchSizeMean()); - json.writeObjectFieldStart("batchSize"); - json.writeNumberField("25", collapserMetrics.getBatchSizePercentile(25)); - json.writeNumberField("50", collapserMetrics.getBatchSizePercentile(50)); - json.writeNumberField("75", collapserMetrics.getBatchSizePercentile(75)); - json.writeNumberField("90", collapserMetrics.getBatchSizePercentile(90)); - json.writeNumberField("95", collapserMetrics.getBatchSizePercentile(95)); - json.writeNumberField("99", collapserMetrics.getBatchSizePercentile(99)); - json.writeNumberField("99.5", collapserMetrics.getBatchSizePercentile(99.5)); - json.writeNumberField("100", collapserMetrics.getBatchSizePercentile(100)); - json.writeEndObject(); - - // shard size percentiles (commented-out for now) - //json.writeNumberField("shardSize_mean", collapserMetrics.getShardSizeMean()); - //json.writeObjectFieldStart("shardSize"); - //json.writeNumberField("25", collapserMetrics.getShardSizePercentile(25)); - //json.writeNumberField("50", collapserMetrics.getShardSizePercentile(50)); - //json.writeNumberField("75", collapserMetrics.getShardSizePercentile(75)); - //json.writeNumberField("90", collapserMetrics.getShardSizePercentile(90)); - //json.writeNumberField("95", collapserMetrics.getShardSizePercentile(95)); - //json.writeNumberField("99", collapserMetrics.getShardSizePercentile(99)); - //json.writeNumberField("99.5", collapserMetrics.getShardSizePercentile(99.5)); - //json.writeNumberField("100", collapserMetrics.getShardSizePercentile(100)); - //json.writeEndObject(); - - //json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", collapserMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get()); - json.writeBooleanField("propertyValue_requestCacheEnabled", collapserMetrics.getProperties().requestCacheEnabled().get()); - json.writeNumberField("propertyValue_maxRequestsInBatch", collapserMetrics.getProperties().maxRequestsInBatch().get()); - json.writeNumberField("propertyValue_timerDelayInMilliseconds", collapserMetrics.getProperties().timerDelayInMilliseconds().get()); - - json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster - - json.writeEndObject(); - json.close(); - - retVal = bos.toByteArray(); - } catch (Exception e) { - LangUtil.rethrowUnchecked(e); - } - - return retVal; - } - -} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStream.java deleted file mode 100644 index f426452b1..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStream.java +++ /dev/null @@ -1,239 +0,0 @@ -package com.netflix.hystrix.contrib.reactivesocket.metrics; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.netflix.hystrix.HystrixCircuitBreaker; -import com.netflix.hystrix.HystrixCommandKey; -import com.netflix.hystrix.HystrixCommandMetrics; -import com.netflix.hystrix.HystrixCommandProperties; -import com.netflix.hystrix.HystrixEventType; -import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier; -import org.agrona.LangUtil; -import rx.functions.Func0; - -import java.io.ByteArrayOutputStream; -import java.util.stream.Stream; - -public class HystrixCommandMetricsStream extends StreamingSupplier { - private static final HystrixCommandMetricsStream INSTANCE = new HystrixCommandMetricsStream(); - - private HystrixCommandMetricsStream() { - super(); - } - - public static HystrixCommandMetricsStream getInstance() { - return INSTANCE; - } - - @Override - protected Stream getStream() { - return HystrixCommandMetrics.getInstances().stream(); - } - - protected byte[] getPayloadData(final HystrixCommandMetrics commandMetrics) { - byte[] retVal = null; - - try - - { - HystrixCommandKey key = commandMetrics.getCommandKey(); - HystrixCircuitBreaker circuitBreaker = HystrixCircuitBreaker.Factory.getInstance(key); - - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - - JsonGenerator json = jsonFactory.createGenerator(bos); - - json.writeStartObject(); - json.writeStringField("type", "HystrixCommand"); - json.writeStringField("name", key.name()); - json.writeStringField("group", commandMetrics.getCommandGroup().name()); - json.writeNumberField("currentTime", System.currentTimeMillis()); - - // circuit breaker - if (circuitBreaker == null) { - // circuit breaker is disabled and thus never open - json.writeBooleanField("isCircuitBreakerOpen", false); - } else { - json.writeBooleanField("isCircuitBreakerOpen", circuitBreaker.isOpen()); - } - HystrixCommandMetrics.HealthCounts healthCounts = commandMetrics.getHealthCounts(); - json.writeNumberField("errorPercentage", healthCounts.getErrorPercentage()); - json.writeNumberField("errorCount", healthCounts.getErrorCount()); - json.writeNumberField("requestCount", healthCounts.getTotalRequests()); - - // rolling counters - safelyWriteNumberField(json, "rollingCountBadRequests", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.BAD_REQUEST); - } - }); - safelyWriteNumberField(json, "rollingCountCollapsedRequests", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.COLLAPSED); - } - }); - safelyWriteNumberField(json, "rollingCountEmit", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.EMIT); - } - }); - safelyWriteNumberField(json, "rollingCountExceptionsThrown", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.EXCEPTION_THROWN); - } - }); - safelyWriteNumberField(json, "rollingCountFailure", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.FAILURE); - } - }); - safelyWriteNumberField(json, "rollingCountFallbackEmit", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_EMIT); - } - }); - safelyWriteNumberField(json, "rollingCountFallbackFailure", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_FAILURE); - } - }); - safelyWriteNumberField(json, "rollingCountFallbackMissing", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_MISSING); - } - }); - safelyWriteNumberField(json, "rollingCountFallbackRejection", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_REJECTION); - } - }); - safelyWriteNumberField(json, "rollingCountFallbackSuccess", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_SUCCESS); - } - }); - safelyWriteNumberField(json, "rollingCountResponsesFromCache", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.RESPONSE_FROM_CACHE); - } - }); - safelyWriteNumberField(json, "rollingCountSemaphoreRejected", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.SEMAPHORE_REJECTED); - } - }); - safelyWriteNumberField(json, "rollingCountShortCircuited", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.SHORT_CIRCUITED); - } - }); - safelyWriteNumberField(json, "rollingCountSuccess", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.SUCCESS); - } - }); - safelyWriteNumberField(json, "rollingCountThreadPoolRejected", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.THREAD_POOL_REJECTED); - } - }); - safelyWriteNumberField(json, "rollingCountTimeout", new Func0() { - @Override - public Long call() { - return commandMetrics.getRollingCount(HystrixEventType.TIMEOUT); - } - }); - - json.writeNumberField("currentConcurrentExecutionCount", commandMetrics.getCurrentConcurrentExecutionCount()); - json.writeNumberField("rollingMaxConcurrentExecutionCount", commandMetrics.getRollingMaxConcurrentExecutions()); - - // latency percentiles - json.writeNumberField("latencyExecute_mean", commandMetrics.getExecutionTimeMean()); - json.writeObjectFieldStart("latencyExecute"); - json.writeNumberField("0", commandMetrics.getExecutionTimePercentile(0)); - json.writeNumberField("25", commandMetrics.getExecutionTimePercentile(25)); - json.writeNumberField("50", commandMetrics.getExecutionTimePercentile(50)); - json.writeNumberField("75", commandMetrics.getExecutionTimePercentile(75)); - json.writeNumberField("90", commandMetrics.getExecutionTimePercentile(90)); - json.writeNumberField("95", commandMetrics.getExecutionTimePercentile(95)); - json.writeNumberField("99", commandMetrics.getExecutionTimePercentile(99)); - json.writeNumberField("99.5", commandMetrics.getExecutionTimePercentile(99.5)); - json.writeNumberField("100", commandMetrics.getExecutionTimePercentile(100)); - json.writeEndObject(); - // - json.writeNumberField("latencyTotal_mean", commandMetrics.getTotalTimeMean()); - json.writeObjectFieldStart("latencyTotal"); - json.writeNumberField("0", commandMetrics.getTotalTimePercentile(0)); - json.writeNumberField("25", commandMetrics.getTotalTimePercentile(25)); - json.writeNumberField("50", commandMetrics.getTotalTimePercentile(50)); - json.writeNumberField("75", commandMetrics.getTotalTimePercentile(75)); - json.writeNumberField("90", commandMetrics.getTotalTimePercentile(90)); - json.writeNumberField("95", commandMetrics.getTotalTimePercentile(95)); - json.writeNumberField("99", commandMetrics.getTotalTimePercentile(99)); - json.writeNumberField("99.5", commandMetrics.getTotalTimePercentile(99.5)); - json.writeNumberField("100", commandMetrics.getTotalTimePercentile(100)); - json.writeEndObject(); - - // property values for reporting what is actually seen by the command rather than what was set somewhere - HystrixCommandProperties commandProperties = commandMetrics.getProperties(); - - json.writeNumberField("propertyValue_circuitBreakerRequestVolumeThreshold", commandProperties.circuitBreakerRequestVolumeThreshold().get()); - json.writeNumberField("propertyValue_circuitBreakerSleepWindowInMilliseconds", commandProperties.circuitBreakerSleepWindowInMilliseconds().get()); - json.writeNumberField("propertyValue_circuitBreakerErrorThresholdPercentage", commandProperties.circuitBreakerErrorThresholdPercentage().get()); - json.writeBooleanField("propertyValue_circuitBreakerForceOpen", commandProperties.circuitBreakerForceOpen().get()); - json.writeBooleanField("propertyValue_circuitBreakerForceClosed", commandProperties.circuitBreakerForceClosed().get()); - json.writeBooleanField("propertyValue_circuitBreakerEnabled", commandProperties.circuitBreakerEnabled().get()); - - json.writeStringField("propertyValue_executionIsolationStrategy", commandProperties.executionIsolationStrategy().get().name()); - json.writeNumberField("propertyValue_executionIsolationThreadTimeoutInMilliseconds", commandProperties.executionTimeoutInMilliseconds().get()); - json.writeNumberField("propertyValue_executionTimeoutInMilliseconds", commandProperties.executionTimeoutInMilliseconds().get()); - json.writeBooleanField("propertyValue_executionIsolationThreadInterruptOnTimeout", commandProperties.executionIsolationThreadInterruptOnTimeout().get()); - json.writeStringField("propertyValue_executionIsolationThreadPoolKeyOverride", commandProperties.executionIsolationThreadPoolKeyOverride().get()); - json.writeNumberField("propertyValue_executionIsolationSemaphoreMaxConcurrentRequests", commandProperties.executionIsolationSemaphoreMaxConcurrentRequests().get()); - json.writeNumberField("propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests", commandProperties.fallbackIsolationSemaphoreMaxConcurrentRequests().get()); - - /* - * The following are commented out as these rarely change and are verbose for streaming for something people don't change. - * We could perhaps allow a property or request argument to include these. - */ - - // json.put("propertyValue_metricsRollingPercentileEnabled", commandProperties.metricsRollingPercentileEnabled().get()); - // json.put("propertyValue_metricsRollingPercentileBucketSize", commandProperties.metricsRollingPercentileBucketSize().get()); - // json.put("propertyValue_metricsRollingPercentileWindow", commandProperties.metricsRollingPercentileWindowInMilliseconds().get()); - // json.put("propertyValue_metricsRollingPercentileWindowBuckets", commandProperties.metricsRollingPercentileWindowBuckets().get()); - // json.put("propertyValue_metricsRollingStatisticalWindowBuckets", commandProperties.metricsRollingStatisticalWindowBuckets().get()); - json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", commandProperties.metricsRollingStatisticalWindowInMilliseconds().get()); - - json.writeBooleanField("propertyValue_requestCacheEnabled", commandProperties.requestCacheEnabled().get()); - json.writeBooleanField("propertyValue_requestLogEnabled", commandProperties.requestLogEnabled().get()); - - json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster - json.writeStringField("threadPool", commandMetrics.getThreadPoolKey().name()); - - json.writeEndObject(); - json.close(); - - retVal = bos.toByteArray(); - } catch (Exception t) { - LangUtil.rethrowUnchecked(t); - } - - return retVal; - } -} - - diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixThreadPoolMetricsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixThreadPoolMetricsStream.java deleted file mode 100644 index c2407934d..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixThreadPoolMetricsStream.java +++ /dev/null @@ -1,94 +0,0 @@ -package com.netflix.hystrix.contrib.reactivesocket.metrics; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.netflix.hystrix.HystrixEventType; -import com.netflix.hystrix.HystrixThreadPoolKey; -import com.netflix.hystrix.HystrixThreadPoolMetrics; -import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier; -import io.reactivesocket.Payload; -import org.agrona.LangUtil; -import rx.Observable; -import rx.functions.Func0; - -import java.io.ByteArrayOutputStream; -import java.util.stream.Stream; - -public class HystrixThreadPoolMetricsStream extends StreamingSupplier { - private static HystrixThreadPoolMetricsStream INSTANCE = new HystrixThreadPoolMetricsStream(); - - private HystrixThreadPoolMetricsStream() { - super(); - } - - public static HystrixThreadPoolMetricsStream getInstance() { - return INSTANCE; - } - - @Override - public boolean filter(HystrixThreadPoolMetrics threadPoolMetrics) { - return threadPoolMetrics.getCurrentCompletedTaskCount().intValue() > 0; - } - - @Override - public Observable get() { - return super.get(); - } - - @Override - protected byte[] getPayloadData(HystrixThreadPoolMetrics threadPoolMetrics) { - byte[] retVal = null; - - try { - HystrixThreadPoolKey key = threadPoolMetrics.getThreadPoolKey(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - JsonGenerator json = jsonFactory.createGenerator(bos); - json.writeStartObject(); - - json.writeStringField("type", "HystrixThreadPool"); - json.writeStringField("name", key.name()); - json.writeNumberField("currentTime", System.currentTimeMillis()); - - json.writeNumberField("currentActiveCount", threadPoolMetrics.getCurrentActiveCount().intValue()); - json.writeNumberField("currentCompletedTaskCount", threadPoolMetrics.getCurrentCompletedTaskCount().longValue()); - json.writeNumberField("currentCorePoolSize", threadPoolMetrics.getCurrentCorePoolSize().intValue()); - json.writeNumberField("currentLargestPoolSize", threadPoolMetrics.getCurrentLargestPoolSize().intValue()); - json.writeNumberField("currentMaximumPoolSize", threadPoolMetrics.getCurrentMaximumPoolSize().intValue()); - json.writeNumberField("currentPoolSize", threadPoolMetrics.getCurrentPoolSize().intValue()); - json.writeNumberField("currentQueueSize", threadPoolMetrics.getCurrentQueueSize().intValue()); - json.writeNumberField("currentTaskCount", threadPoolMetrics.getCurrentTaskCount().longValue()); - safelyWriteNumberField(json, "rollingCountThreadsExecuted", new Func0() { - @Override - public Long call() { - return threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.EXECUTED); - } - }); - json.writeNumberField("rollingMaxActiveThreads", threadPoolMetrics.getRollingMaxActiveThreads()); - safelyWriteNumberField(json, "rollingCountCommandRejections", new Func0() { - @Override - public Long call() { - return threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.REJECTED); - } - }); - - json.writeNumberField("propertyValue_queueSizeRejectionThreshold", threadPoolMetrics.getProperties().queueSizeRejectionThreshold().get()); - json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", threadPoolMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get()); - - json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster - - json.writeEndObject(); - json.close(); - - retVal = bos.toByteArray(); - - } catch (Exception e) { - LangUtil.rethrowUnchecked(e); - } - - return retVal; - } - - @Override - protected Stream getStream() { - return HystrixThreadPoolMetrics.getInstances().stream(); - } -} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java deleted file mode 100644 index 64f780f3d..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStream.java +++ /dev/null @@ -1,165 +0,0 @@ -package com.netflix.hystrix.contrib.reactivesocket.sample; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.netflix.hystrix.HystrixCollapserKey; -import com.netflix.hystrix.HystrixCommandKey; -import com.netflix.hystrix.HystrixThreadPoolKey; -import com.netflix.hystrix.config.HystrixCollapserConfiguration; -import com.netflix.hystrix.config.HystrixCommandConfiguration; -import com.netflix.hystrix.config.HystrixConfiguration; -import com.netflix.hystrix.config.HystrixConfigurationStream; -import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; -import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier; -import io.reactivesocket.Frame; -import io.reactivesocket.Payload; -import org.agrona.LangUtil; -import rx.Observable; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Map; - -public class HystrixConfigStream extends BasePayloadSupplier { - private static final HystrixConfigStream INSTANCE = new HystrixConfigStream(); - - private HystrixConfigStream() { - super(); - - HystrixConfigurationStream stream = new HystrixConfigurationStream(100); - stream - .observe() - .map(this::getPayloadData) - .map(b -> new Payload() { - @Override - public ByteBuffer getData() { - return ByteBuffer.wrap(b); - } - - @Override - public ByteBuffer getMetadata() { - return Frame.NULL_BYTEBUFFER; - } - }) - .subscribe(subject); - } - - public static HystrixConfigStream getInstance() { - return INSTANCE; - } - - @Override - public Observable get() { - return subject; - } - - public byte[] getPayloadData(HystrixConfiguration config) { - byte[] retVal = null; - - try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - JsonGenerator json = jsonFactory.createGenerator(bos); - - json.writeStartObject(); - json.writeStringField("type", "HystrixConfig"); - json.writeObjectFieldStart("commands"); - for (Map.Entry entry: config.getCommandConfig().entrySet()) { - final HystrixCommandKey key = entry.getKey(); - final HystrixCommandConfiguration commandConfig = entry.getValue(); - writeCommandConfigJson(json, key, commandConfig); - - } - json.writeEndObject(); - - json.writeObjectFieldStart("threadpools"); - for (Map.Entry entry: config.getThreadPoolConfig().entrySet()) { - final HystrixThreadPoolKey threadPoolKey = entry.getKey(); - final HystrixThreadPoolConfiguration threadPoolConfig = entry.getValue(); - writeThreadPoolConfigJson(json, threadPoolKey, threadPoolConfig); - } - json.writeEndObject(); - - json.writeObjectFieldStart("collapsers"); - for (Map.Entry entry: config.getCollapserConfig().entrySet()) { - final HystrixCollapserKey collapserKey = entry.getKey(); - final HystrixCollapserConfiguration collapserConfig = entry.getValue(); - writeCollapserConfigJson(json, collapserKey, collapserConfig); - } - json.writeEndObject(); - json.writeEndObject(); - json.close(); - - - retVal = bos.toByteArray(); - } catch (Exception e) { - LangUtil.rethrowUnchecked(e); - } - - return retVal; - } - - private static void writeCommandConfigJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandConfiguration commandConfig) throws IOException { - json.writeObjectFieldStart(key.name()); - json.writeStringField("threadPoolKey", commandConfig.getThreadPoolKey().name()); - json.writeStringField("groupKey", commandConfig.getGroupKey().name()); - json.writeObjectFieldStart("execution"); - HystrixCommandConfiguration.HystrixCommandExecutionConfig executionConfig = commandConfig.getExecutionConfig(); - json.writeStringField("isolationStrategy", executionConfig.getIsolationStrategy().name()); - json.writeStringField("threadPoolKeyOverride", executionConfig.getThreadPoolKeyOverride()); - json.writeBooleanField("requestCacheEnabled", executionConfig.isRequestCacheEnabled()); - json.writeBooleanField("requestLogEnabled", executionConfig.isRequestLogEnabled()); - json.writeBooleanField("timeoutEnabled", executionConfig.isTimeoutEnabled()); - json.writeBooleanField("fallbackEnabled", executionConfig.isFallbackEnabled()); - json.writeNumberField("timeoutInMilliseconds", executionConfig.getTimeoutInMilliseconds()); - json.writeNumberField("semaphoreSize", executionConfig.getSemaphoreMaxConcurrentRequests()); - json.writeNumberField("fallbackSemaphoreSize", executionConfig.getFallbackMaxConcurrentRequest()); - json.writeBooleanField("threadInterruptOnTimeout", executionConfig.isThreadInterruptOnTimeout()); - json.writeEndObject(); - json.writeObjectFieldStart("metrics"); - HystrixCommandConfiguration.HystrixCommandMetricsConfig metricsConfig = commandConfig.getMetricsConfig(); - json.writeNumberField("healthBucketSizeInMs", metricsConfig.getHealthIntervalInMilliseconds()); - json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); - json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); - json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); - json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); - json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); - json.writeEndObject(); - json.writeObjectFieldStart("circuitBreaker"); - HystrixCommandConfiguration.HystrixCommandCircuitBreakerConfig circuitBreakerConfig = commandConfig.getCircuitBreakerConfig(); - json.writeBooleanField("enabled", circuitBreakerConfig.isEnabled()); - json.writeBooleanField("isForcedOpen", circuitBreakerConfig.isForceOpen()); - json.writeBooleanField("isForcedClosed", circuitBreakerConfig.isForceOpen()); - json.writeNumberField("requestVolumeThreshold", circuitBreakerConfig.getRequestVolumeThreshold()); - json.writeNumberField("errorPercentageThreshold", circuitBreakerConfig.getErrorThresholdPercentage()); - json.writeNumberField("sleepInMilliseconds", circuitBreakerConfig.getSleepWindowInMilliseconds()); - json.writeEndObject(); - json.writeEndObject(); - } - - private static void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolConfiguration threadPoolConfig) throws IOException { - json.writeObjectFieldStart(threadPoolKey.name()); - json.writeNumberField("coreSize", threadPoolConfig.getCoreSize()); - json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize()); - json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold()); - json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes()); - json.writeNumberField("counterBucketSizeInMilliseconds", threadPoolConfig.getRollingCounterBucketSizeInMilliseconds()); - json.writeNumberField("counterBucketCount", threadPoolConfig.getRollingCounterNumberOfBuckets()); - json.writeEndObject(); - } - - private static void writeCollapserConfigJson(JsonGenerator json, HystrixCollapserKey collapserKey, HystrixCollapserConfiguration collapserConfig) throws IOException { - json.writeObjectFieldStart(collapserKey.name()); - json.writeNumberField("maxRequestsInBatch", collapserConfig.getMaxRequestsInBatch()); - json.writeNumberField("timerDelayInMilliseconds", collapserConfig.getTimerDelayInMilliseconds()); - json.writeBooleanField("requestCacheEnabled", collapserConfig.isRequestCacheEnabled()); - json.writeObjectFieldStart("metrics"); - HystrixCollapserConfiguration.CollapserMetricsConfig metricsConfig = collapserConfig.getCollapserMetricsConfig(); - json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); - json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); - json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); - json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); - json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); - json.writeEndObject(); - json.writeEndObject(); - } -} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java deleted file mode 100644 index b5f9257da..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixUtilizationStream.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.netflix.hystrix.contrib.reactivesocket.sample; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.netflix.hystrix.HystrixCommandKey; -import com.netflix.hystrix.HystrixThreadPoolKey; -import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier; -import com.netflix.hystrix.metric.sample.HystrixCommandUtilization; -import com.netflix.hystrix.metric.sample.HystrixThreadPoolUtilization; -import com.netflix.hystrix.metric.sample.HystrixUtilization; -import io.reactivesocket.Frame; -import io.reactivesocket.Payload; -import org.agrona.LangUtil; -import rx.Observable; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Map; - -public class HystrixUtilizationStream extends BasePayloadSupplier { - private static final HystrixUtilizationStream INSTANCE = new HystrixUtilizationStream(); - - private HystrixUtilizationStream() { - super(); - - com.netflix.hystrix.metric.sample.HystrixUtilizationStream stream - = new com.netflix.hystrix.metric.sample.HystrixUtilizationStream(100); - stream - .observe() - .map(this::getPayloadData) - .map(b -> new Payload() { - @Override - public ByteBuffer getData() { - return ByteBuffer.wrap(b); - } - - @Override - public ByteBuffer getMetadata() { - return Frame.NULL_BYTEBUFFER; - } - }) - .subscribe(subject); - } - - public static HystrixUtilizationStream getInstance() { - return INSTANCE; - } - - @Override - public Observable get() { - return subject; - } - - public byte[] getPayloadData(HystrixUtilization utilization) { - byte[] retVal = null; - - try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - JsonGenerator json = jsonFactory.createGenerator(bos); - - json.writeStartObject(); - json.writeStringField("type", "HystrixUtilization"); - json.writeObjectFieldStart("commands"); - for (Map.Entry entry: utilization.getCommandUtilizationMap().entrySet()) { - final HystrixCommandKey key = entry.getKey(); - final HystrixCommandUtilization commandUtilization = entry.getValue(); - writeCommandUtilizationJson(json, key, commandUtilization); - - } - json.writeEndObject(); - - json.writeObjectFieldStart("threadpools"); - for (Map.Entry entry: utilization.getThreadPoolUtilizationMap().entrySet()) { - final HystrixThreadPoolKey threadPoolKey = entry.getKey(); - final HystrixThreadPoolUtilization threadPoolUtilization = entry.getValue(); - writeThreadPoolUtilizationJson(json, threadPoolKey, threadPoolUtilization); - } - json.writeEndObject(); - json.writeEndObject(); - json.close(); - - retVal = bos.toByteArray(); - } catch (Exception e) { - LangUtil.rethrowUnchecked(e); - } - - return retVal; - } - - private static void writeCommandUtilizationJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandUtilization utilization) throws IOException { - json.writeObjectFieldStart(key.name()); - json.writeNumberField("activeCount", utilization.getConcurrentCommandCount()); - json.writeEndObject(); - } - - private static void writeThreadPoolUtilizationJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolUtilization utilization) throws IOException { - json.writeObjectFieldStart(threadPoolKey.name()); - json.writeNumberField("activeCount", utilization.getCurrentActiveCount()); - json.writeNumberField("queueSize", utilization.getCurrentQueueSize()); - json.writeNumberField("corePoolSize", utilization.getCurrentCorePoolSize()); - json.writeNumberField("poolSize", utilization.getCurrentPoolSize()); - json.writeEndObject(); - } -} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixConfiguration.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixConfiguration.java new file mode 100644 index 000000000..32045ea11 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixConfiguration.java @@ -0,0 +1,256 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.reactivesocket.serialize; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.dataformat.cbor.CBORParser; +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixCommandProperties; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.config.HystrixCollapserConfiguration; +import com.netflix.hystrix.config.HystrixCommandConfiguration; +import com.netflix.hystrix.config.HystrixConfiguration; +import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; +import org.agrona.LangUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +public class SerialHystrixConfiguration extends SerialHystrixMetric { + + private static final Logger logger = LoggerFactory.getLogger(SerialHystrixConfiguration.class); + + public static byte[] toBytes(HystrixConfiguration config) { + byte[] retVal = null; + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator json = cborFactory.createGenerator(bos); + + json.writeStartObject(); + json.writeStringField("type", "HystrixConfig"); + json.writeObjectFieldStart("commands"); + for (Map.Entry entry: config.getCommandConfig().entrySet()) { + final HystrixCommandKey key = entry.getKey(); + final HystrixCommandConfiguration commandConfig = entry.getValue(); + writeCommandConfigJson(json, key, commandConfig); + + } + json.writeEndObject(); + + json.writeObjectFieldStart("threadpools"); + for (Map.Entry entry: config.getThreadPoolConfig().entrySet()) { + final HystrixThreadPoolKey threadPoolKey = entry.getKey(); + final HystrixThreadPoolConfiguration threadPoolConfig = entry.getValue(); + writeThreadPoolConfigJson(json, threadPoolKey, threadPoolConfig); + } + json.writeEndObject(); + + json.writeObjectFieldStart("collapsers"); + for (Map.Entry entry: config.getCollapserConfig().entrySet()) { + final HystrixCollapserKey collapserKey = entry.getKey(); + final HystrixCollapserConfiguration collapserConfig = entry.getValue(); + writeCollapserConfigJson(json, collapserKey, collapserConfig); + } + json.writeEndObject(); + json.writeEndObject(); + json.close(); + + + retVal = bos.toByteArray(); + } catch (Exception e) { + LangUtil.rethrowUnchecked(e); + } + + return retVal; + } + + public static HystrixConfiguration fromByteBuffer(ByteBuffer bb) { + byte[] byteArray = new byte[bb.remaining()]; + bb.get(byteArray); + + Map commandConfigMap = new HashMap<>(); + Map threadPoolConfigMap = new HashMap<>(); + Map collapserConfigMap = new HashMap<>(); + + try { + CBORParser parser = cborFactory.createParser(byteArray); + JsonNode rootNode = mapper.readTree(parser); + + Iterator> commands = rootNode.path("commands").fields(); + Iterator> threadPools = rootNode.path("threadpools").fields(); + Iterator> collapsers = rootNode.path("collapsers").fields(); + + while (commands.hasNext()) { + Map.Entry command = commands.next(); + + JsonNode executionConfig = command.getValue().path("execution"); + + JsonNode circuitBreakerConfig = command.getValue().path("circuitBreaker"); + JsonNode metricsConfig = command.getValue().path("metrics"); + HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(command.getKey()); + HystrixCommandConfiguration commandConfig = new HystrixCommandConfiguration( + commandKey, + HystrixThreadPoolKey.Factory.asKey(command.getValue().path("threadPoolKey").asText()), + HystrixCommandGroupKey.Factory.asKey(command.getValue().path("groupKey").asText()), + new HystrixCommandConfiguration.HystrixCommandExecutionConfig( + executionConfig.path("semaphoreSize").asInt(), + HystrixCommandProperties.ExecutionIsolationStrategy.valueOf( + executionConfig.path("isolationStrategy").asText()), + executionConfig.path("threadInterruptOnTimeout").asBoolean(), + executionConfig.path("threadPoolKeyOverride").asText(), + executionConfig.path("timeoutEnabled").asBoolean(), + executionConfig.path("timeoutInMilliseconds").asInt(), + executionConfig.path("fallbackEnabled").asBoolean(), + executionConfig.path("fallbackSemaphoreSize").asInt(), + executionConfig.path("requestCacheEnabled").asBoolean(), + executionConfig.path("requestLogEnabled").asBoolean() + ), + new HystrixCommandConfiguration.HystrixCommandCircuitBreakerConfig( + circuitBreakerConfig.path("enabled").asBoolean(), + circuitBreakerConfig.path("errorPercentageThreshold").asInt(), + circuitBreakerConfig.path("isForcedClosed").asBoolean(), + circuitBreakerConfig.path("isForcedOpen").asBoolean(), + circuitBreakerConfig.path("requestVolumeThreshold").asInt(), + circuitBreakerConfig.path("sleepInMilliseconds").asInt() + ), + new HystrixCommandConfiguration.HystrixCommandMetricsConfig( + metricsConfig.path("healthBucketSizeInMs").asInt(), + metricsConfig.path("percentileEnabled").asBoolean(), + metricsConfig.path("percentileBucketCount").asInt(), + metricsConfig.path("percentileBucketSizeInMilliseconds").asInt(), + metricsConfig.path("counterBucketCount").asInt(), + metricsConfig.path("counterBucketSizeInMilliseconds").asInt() + ) + ); + + commandConfigMap.put(commandKey, commandConfig); + } + + while (threadPools.hasNext()) { + Map.Entry threadPool = threadPools.next(); + HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey(threadPool.getKey()); + HystrixThreadPoolConfiguration threadPoolConfig = new HystrixThreadPoolConfiguration( + threadPoolKey, + threadPool.getValue().path("coreSize").asInt(), + threadPool.getValue().path("maxQueueSize").asInt(), + threadPool.getValue().path("queueRejectionThreshold").asInt(), + threadPool.getValue().path("keepAliveTimeInMinutes").asInt(), + threadPool.getValue().path("counterBucketCount").asInt(), + threadPool.getValue().path("counterBucketSizeInMilliseconds").asInt() + ); + threadPoolConfigMap.put(threadPoolKey, threadPoolConfig); + } + + while (collapsers.hasNext()) { + Map.Entry collapser = collapsers.next(); + HystrixCollapserKey collapserKey = HystrixCollapserKey.Factory.asKey(collapser.getKey()); + JsonNode metricsConfig = collapser.getValue().path("metrics"); + HystrixCollapserConfiguration collapserConfig = new HystrixCollapserConfiguration( + collapserKey, + collapser.getValue().path("maxRequestsInBatch").asInt(), + collapser.getValue().path("timerDelayInMilliseconds").asInt(), + collapser.getValue().path("requestCacheEnabled").asBoolean(), + new HystrixCollapserConfiguration.CollapserMetricsConfig( + metricsConfig.path("percentileBucketCount").asInt(), + metricsConfig.path("percentileBucketSizeInMilliseconds").asInt(), + metricsConfig.path("percentileEnabled").asBoolean(), + metricsConfig.path("counterBucketCount").asInt(), + metricsConfig.path("counterBucketSizeInMilliseconds").asInt() + ) + ); + collapserConfigMap.put(collapserKey, collapserConfig); + } + } catch (IOException ioe) { + logger.error("IO Exception during deserialization of HystrixConfiguration : " + ioe); + } + return new HystrixConfiguration(commandConfigMap, threadPoolConfigMap, collapserConfigMap); + } + + private static void writeCommandConfigJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandConfiguration commandConfig) throws IOException { + json.writeObjectFieldStart(key.name()); + json.writeStringField("threadPoolKey", commandConfig.getThreadPoolKey().name()); + json.writeStringField("groupKey", commandConfig.getGroupKey().name()); + json.writeObjectFieldStart("execution"); + HystrixCommandConfiguration.HystrixCommandExecutionConfig executionConfig = commandConfig.getExecutionConfig(); + json.writeStringField("isolationStrategy", executionConfig.getIsolationStrategy().name()); + json.writeStringField("threadPoolKeyOverride", executionConfig.getThreadPoolKeyOverride()); + json.writeBooleanField("requestCacheEnabled", executionConfig.isRequestCacheEnabled()); + json.writeBooleanField("requestLogEnabled", executionConfig.isRequestLogEnabled()); + json.writeBooleanField("timeoutEnabled", executionConfig.isTimeoutEnabled()); + json.writeBooleanField("fallbackEnabled", executionConfig.isFallbackEnabled()); + json.writeNumberField("timeoutInMilliseconds", executionConfig.getTimeoutInMilliseconds()); + json.writeNumberField("semaphoreSize", executionConfig.getSemaphoreMaxConcurrentRequests()); + json.writeNumberField("fallbackSemaphoreSize", executionConfig.getFallbackMaxConcurrentRequest()); + json.writeBooleanField("threadInterruptOnTimeout", executionConfig.isThreadInterruptOnTimeout()); + json.writeEndObject(); + json.writeObjectFieldStart("metrics"); + HystrixCommandConfiguration.HystrixCommandMetricsConfig metricsConfig = commandConfig.getMetricsConfig(); + json.writeNumberField("healthBucketSizeInMs", metricsConfig.getHealthIntervalInMilliseconds()); + json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); + json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); + json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + json.writeObjectFieldStart("circuitBreaker"); + HystrixCommandConfiguration.HystrixCommandCircuitBreakerConfig circuitBreakerConfig = commandConfig.getCircuitBreakerConfig(); + json.writeBooleanField("enabled", circuitBreakerConfig.isEnabled()); + json.writeBooleanField("isForcedOpen", circuitBreakerConfig.isForceOpen()); + json.writeBooleanField("isForcedClosed", circuitBreakerConfig.isForceOpen()); + json.writeNumberField("requestVolumeThreshold", circuitBreakerConfig.getRequestVolumeThreshold()); + json.writeNumberField("errorPercentageThreshold", circuitBreakerConfig.getErrorThresholdPercentage()); + json.writeNumberField("sleepInMilliseconds", circuitBreakerConfig.getSleepWindowInMilliseconds()); + json.writeEndObject(); + json.writeEndObject(); + } + + private static void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolConfiguration threadPoolConfig) throws IOException { + json.writeObjectFieldStart(threadPoolKey.name()); + json.writeNumberField("coreSize", threadPoolConfig.getCoreSize()); + json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize()); + json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold()); + json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes()); + json.writeNumberField("counterBucketSizeInMilliseconds", threadPoolConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", threadPoolConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + } + + private static void writeCollapserConfigJson(JsonGenerator json, HystrixCollapserKey collapserKey, HystrixCollapserConfiguration collapserConfig) throws IOException { + json.writeObjectFieldStart(collapserKey.name()); + json.writeNumberField("maxRequestsInBatch", collapserConfig.getMaxRequestsInBatch()); + json.writeNumberField("timerDelayInMilliseconds", collapserConfig.getTimerDelayInMilliseconds()); + json.writeBooleanField("requestCacheEnabled", collapserConfig.isRequestCacheEnabled()); + json.writeObjectFieldStart("metrics"); + HystrixCollapserConfiguration.CollapserMetricsConfig metricsConfig = collapserConfig.getCollapserMetricsConfig(); + json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); + json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); + json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + json.writeEndObject(); + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixDashboardData.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixDashboardData.java new file mode 100644 index 000000000..277f344b6 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixDashboardData.java @@ -0,0 +1,271 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.reactivesocket.serialize; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.hystrix.HystrixCircuitBreaker; +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCollapserMetrics; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixCommandMetrics; +import com.netflix.hystrix.HystrixCommandProperties; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.HystrixThreadPoolMetrics; +import com.netflix.hystrix.contrib.reactivesocket.HystrixDashboardStream; +import org.agrona.LangUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.function.Supplier; + +public class SerialHystrixDashboardData extends SerialHystrixMetric { + + private static final Logger logger = LoggerFactory.getLogger(SerialHystrixDashboardData.class); + + public static byte[] toBytes(HystrixDashboardStream.DashboardData dashboardData) { + byte[] retVal = null; + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator json = cborFactory.createGenerator(bos); + + json.writeStartArray(); + + for (HystrixCommandMetrics commandMetrics: dashboardData.getCommandMetrics()) { + writeCommandMetrics(commandMetrics, json); + } + + for (HystrixThreadPoolMetrics threadPoolMetrics: dashboardData.getThreadPoolMetrics()) { + writeThreadPoolMetrics(threadPoolMetrics, json); + } + + for (HystrixCollapserMetrics collapserMetrics: dashboardData.getCollapserMetrics()) { + writeCollapserMetrics(collapserMetrics, json); + } + + json.writeEndArray(); + + json.close(); + retVal = bos.toByteArray(); + + } catch (Exception e) { + LangUtil.rethrowUnchecked(e); + } + + return retVal; + } + + private static void writeCommandMetrics(HystrixCommandMetrics commandMetrics, JsonGenerator json) throws IOException { + HystrixCommandKey key = commandMetrics.getCommandKey(); + HystrixCircuitBreaker circuitBreaker = HystrixCircuitBreaker.Factory.getInstance(key); + + json.writeStartObject(); + json.writeStringField("type", "HystrixCommand"); + json.writeStringField("name", key.name()); + json.writeStringField("group", commandMetrics.getCommandGroup().name()); + json.writeNumberField("currentTime", System.currentTimeMillis()); + + // circuit breaker + if (circuitBreaker == null) { + // circuit breaker is disabled and thus never open + json.writeBooleanField("isCircuitBreakerOpen", false); + } else { + json.writeBooleanField("isCircuitBreakerOpen", circuitBreaker.isOpen()); + } + HystrixCommandMetrics.HealthCounts healthCounts = commandMetrics.getHealthCounts(); + json.writeNumberField("errorPercentage", healthCounts.getErrorPercentage()); + json.writeNumberField("errorCount", healthCounts.getErrorCount()); + json.writeNumberField("requestCount", healthCounts.getTotalRequests()); + + // rolling counters + safelyWriteNumberField(json, "rollingCountBadRequests", () -> commandMetrics.getRollingCount(HystrixEventType.BAD_REQUEST)); + safelyWriteNumberField(json, "rollingCountCollapsedRequests", () -> commandMetrics.getRollingCount(HystrixEventType.COLLAPSED)); + safelyWriteNumberField(json, "rollingCountEmit", () -> commandMetrics.getRollingCount(HystrixEventType.EMIT)); + safelyWriteNumberField(json, "rollingCountExceptionsThrown", () -> commandMetrics.getRollingCount(HystrixEventType.EXCEPTION_THROWN)); + safelyWriteNumberField(json, "rollingCountFailure", () -> commandMetrics.getRollingCount(HystrixEventType.FAILURE)); + safelyWriteNumberField(json, "rollingCountFallbackEmit", () -> commandMetrics.getRollingCount(HystrixEventType.FALLBACK_EMIT)); + safelyWriteNumberField(json, "rollingCountFallbackFailure", () -> commandMetrics.getRollingCount(HystrixEventType.FALLBACK_FAILURE)); + safelyWriteNumberField(json, "rollingCountFallbackMissing", () -> commandMetrics.getRollingCount(HystrixEventType.FALLBACK_MISSING)); + safelyWriteNumberField(json, "rollingCountFallbackRejection", () -> commandMetrics.getRollingCount(HystrixEventType.FALLBACK_REJECTION)); + safelyWriteNumberField(json, "rollingCountFallbackSuccess", () -> commandMetrics.getRollingCount(HystrixEventType.FALLBACK_SUCCESS)); + safelyWriteNumberField(json, "rollingCountResponsesFromCache", () -> commandMetrics.getRollingCount(HystrixEventType.RESPONSE_FROM_CACHE)); + safelyWriteNumberField(json, "rollingCountSemaphoreRejected", () -> commandMetrics.getRollingCount(HystrixEventType.SEMAPHORE_REJECTED)); + safelyWriteNumberField(json, "rollingCountShortCircuited", () -> commandMetrics.getRollingCount(HystrixEventType.SHORT_CIRCUITED)); + safelyWriteNumberField(json, "rollingCountSuccess", () -> commandMetrics.getRollingCount(HystrixEventType.SUCCESS)); + safelyWriteNumberField(json, "rollingCountThreadPoolRejected", () -> commandMetrics.getRollingCount(HystrixEventType.THREAD_POOL_REJECTED)); + safelyWriteNumberField(json, "rollingCountTimeout", () -> commandMetrics.getRollingCount(HystrixEventType.TIMEOUT)); + + json.writeNumberField("currentConcurrentExecutionCount", commandMetrics.getCurrentConcurrentExecutionCount()); + json.writeNumberField("rollingMaxConcurrentExecutionCount", commandMetrics.getRollingMaxConcurrentExecutions()); + + // latency percentiles + json.writeNumberField("latencyExecute_mean", commandMetrics.getExecutionTimeMean()); + json.writeObjectFieldStart("latencyExecute"); + json.writeNumberField("0", commandMetrics.getExecutionTimePercentile(0)); + json.writeNumberField("25", commandMetrics.getExecutionTimePercentile(25)); + json.writeNumberField("50", commandMetrics.getExecutionTimePercentile(50)); + json.writeNumberField("75", commandMetrics.getExecutionTimePercentile(75)); + json.writeNumberField("90", commandMetrics.getExecutionTimePercentile(90)); + json.writeNumberField("95", commandMetrics.getExecutionTimePercentile(95)); + json.writeNumberField("99", commandMetrics.getExecutionTimePercentile(99)); + json.writeNumberField("99.5", commandMetrics.getExecutionTimePercentile(99.5)); + json.writeNumberField("100", commandMetrics.getExecutionTimePercentile(100)); + json.writeEndObject(); + // + json.writeNumberField("latencyTotal_mean", commandMetrics.getTotalTimeMean()); + json.writeObjectFieldStart("latencyTotal"); + json.writeNumberField("0", commandMetrics.getTotalTimePercentile(0)); + json.writeNumberField("25", commandMetrics.getTotalTimePercentile(25)); + json.writeNumberField("50", commandMetrics.getTotalTimePercentile(50)); + json.writeNumberField("75", commandMetrics.getTotalTimePercentile(75)); + json.writeNumberField("90", commandMetrics.getTotalTimePercentile(90)); + json.writeNumberField("95", commandMetrics.getTotalTimePercentile(95)); + json.writeNumberField("99", commandMetrics.getTotalTimePercentile(99)); + json.writeNumberField("99.5", commandMetrics.getTotalTimePercentile(99.5)); + json.writeNumberField("100", commandMetrics.getTotalTimePercentile(100)); + json.writeEndObject(); + + // property values for reporting what is actually seen by the command rather than what was set somewhere + HystrixCommandProperties commandProperties = commandMetrics.getProperties(); + + json.writeNumberField("propertyValue_circuitBreakerRequestVolumeThreshold", commandProperties.circuitBreakerRequestVolumeThreshold().get()); + json.writeNumberField("propertyValue_circuitBreakerSleepWindowInMilliseconds", commandProperties.circuitBreakerSleepWindowInMilliseconds().get()); + json.writeNumberField("propertyValue_circuitBreakerErrorThresholdPercentage", commandProperties.circuitBreakerErrorThresholdPercentage().get()); + json.writeBooleanField("propertyValue_circuitBreakerForceOpen", commandProperties.circuitBreakerForceOpen().get()); + json.writeBooleanField("propertyValue_circuitBreakerForceClosed", commandProperties.circuitBreakerForceClosed().get()); + json.writeBooleanField("propertyValue_circuitBreakerEnabled", commandProperties.circuitBreakerEnabled().get()); + + json.writeStringField("propertyValue_executionIsolationStrategy", commandProperties.executionIsolationStrategy().get().name()); + json.writeNumberField("propertyValue_executionIsolationThreadTimeoutInMilliseconds", commandProperties.executionTimeoutInMilliseconds().get()); + json.writeNumberField("propertyValue_executionTimeoutInMilliseconds", commandProperties.executionTimeoutInMilliseconds().get()); + json.writeBooleanField("propertyValue_executionIsolationThreadInterruptOnTimeout", commandProperties.executionIsolationThreadInterruptOnTimeout().get()); + json.writeStringField("propertyValue_executionIsolationThreadPoolKeyOverride", commandProperties.executionIsolationThreadPoolKeyOverride().get()); + json.writeNumberField("propertyValue_executionIsolationSemaphoreMaxConcurrentRequests", commandProperties.executionIsolationSemaphoreMaxConcurrentRequests().get()); + json.writeNumberField("propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests", commandProperties.fallbackIsolationSemaphoreMaxConcurrentRequests().get()); + + /* + * The following are commented out as these rarely change and are verbose for streaming for something people don't change. + * We could perhaps allow a property or request argument to include these. + */ + + // json.put("propertyValue_metricsRollingPercentileEnabled", commandProperties.metricsRollingPercentileEnabled().get()); + // json.put("propertyValue_metricsRollingPercentileBucketSize", commandProperties.metricsRollingPercentileBucketSize().get()); + // json.put("propertyValue_metricsRollingPercentileWindow", commandProperties.metricsRollingPercentileWindowInMilliseconds().get()); + // json.put("propertyValue_metricsRollingPercentileWindowBuckets", commandProperties.metricsRollingPercentileWindowBuckets().get()); + // json.put("propertyValue_metricsRollingStatisticalWindowBuckets", commandProperties.metricsRollingStatisticalWindowBuckets().get()); + json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", commandProperties.metricsRollingStatisticalWindowInMilliseconds().get()); + + json.writeBooleanField("propertyValue_requestCacheEnabled", commandProperties.requestCacheEnabled().get()); + json.writeBooleanField("propertyValue_requestLogEnabled", commandProperties.requestLogEnabled().get()); + + json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster + json.writeStringField("threadPool", commandMetrics.getThreadPoolKey().name()); + + json.writeEndObject(); + } + + private static void writeThreadPoolMetrics(HystrixThreadPoolMetrics threadPoolMetrics, JsonGenerator json) throws IOException { + HystrixThreadPoolKey key = threadPoolMetrics.getThreadPoolKey(); + + json.writeStartObject(); + + json.writeStringField("type", "HystrixThreadPool"); + json.writeStringField("name", key.name()); + json.writeNumberField("currentTime", System.currentTimeMillis()); + + json.writeNumberField("currentActiveCount", threadPoolMetrics.getCurrentActiveCount().intValue()); + json.writeNumberField("currentCompletedTaskCount", threadPoolMetrics.getCurrentCompletedTaskCount().longValue()); + json.writeNumberField("currentCorePoolSize", threadPoolMetrics.getCurrentCorePoolSize().intValue()); + json.writeNumberField("currentLargestPoolSize", threadPoolMetrics.getCurrentLargestPoolSize().intValue()); + json.writeNumberField("currentMaximumPoolSize", threadPoolMetrics.getCurrentMaximumPoolSize().intValue()); + json.writeNumberField("currentPoolSize", threadPoolMetrics.getCurrentPoolSize().intValue()); + json.writeNumberField("currentQueueSize", threadPoolMetrics.getCurrentQueueSize().intValue()); + json.writeNumberField("currentTaskCount", threadPoolMetrics.getCurrentTaskCount().longValue()); + safelyWriteNumberField(json, "rollingCountThreadsExecuted", () -> threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.EXECUTED)); + json.writeNumberField("rollingMaxActiveThreads", threadPoolMetrics.getRollingMaxActiveThreads()); + safelyWriteNumberField(json, "rollingCountCommandRejections", () -> threadPoolMetrics.getRollingCount(HystrixEventType.ThreadPool.REJECTED)); + + json.writeNumberField("propertyValue_queueSizeRejectionThreshold", threadPoolMetrics.getProperties().queueSizeRejectionThreshold().get()); + json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", threadPoolMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get()); + + json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster + + json.writeEndObject(); + } + + private static void writeCollapserMetrics(HystrixCollapserMetrics collapserMetrics, JsonGenerator json) throws IOException { + HystrixCollapserKey key = collapserMetrics.getCollapserKey(); + + json.writeStartObject(); + + json.writeStringField("type", "HystrixCollapser"); + json.writeStringField("name", key.name()); + json.writeNumberField("currentTime", System.currentTimeMillis()); + + safelyWriteNumberField(json, "rollingCountRequestsBatched", () -> collapserMetrics.getRollingCount(HystrixEventType.Collapser.ADDED_TO_BATCH)); + safelyWriteNumberField(json, "rollingCountBatches", () -> collapserMetrics.getRollingCount(HystrixEventType.Collapser.BATCH_EXECUTED)); + safelyWriteNumberField(json, "rollingCountResponsesFromCache", () -> collapserMetrics.getRollingCount(HystrixEventType.Collapser.RESPONSE_FROM_CACHE)); + + // batch size percentiles + json.writeNumberField("batchSize_mean", collapserMetrics.getBatchSizeMean()); + json.writeObjectFieldStart("batchSize"); + json.writeNumberField("25", collapserMetrics.getBatchSizePercentile(25)); + json.writeNumberField("50", collapserMetrics.getBatchSizePercentile(50)); + json.writeNumberField("75", collapserMetrics.getBatchSizePercentile(75)); + json.writeNumberField("90", collapserMetrics.getBatchSizePercentile(90)); + json.writeNumberField("95", collapserMetrics.getBatchSizePercentile(95)); + json.writeNumberField("99", collapserMetrics.getBatchSizePercentile(99)); + json.writeNumberField("99.5", collapserMetrics.getBatchSizePercentile(99.5)); + json.writeNumberField("100", collapserMetrics.getBatchSizePercentile(100)); + json.writeEndObject(); + + // shard size percentiles (commented-out for now) + //json.writeNumberField("shardSize_mean", collapserMetrics.getShardSizeMean()); + //json.writeObjectFieldStart("shardSize"); + //json.writeNumberField("25", collapserMetrics.getShardSizePercentile(25)); + //json.writeNumberField("50", collapserMetrics.getShardSizePercentile(50)); + //json.writeNumberField("75", collapserMetrics.getShardSizePercentile(75)); + //json.writeNumberField("90", collapserMetrics.getShardSizePercentile(90)); + //json.writeNumberField("95", collapserMetrics.getShardSizePercentile(95)); + //json.writeNumberField("99", collapserMetrics.getShardSizePercentile(99)); + //json.writeNumberField("99.5", collapserMetrics.getShardSizePercentile(99.5)); + //json.writeNumberField("100", collapserMetrics.getShardSizePercentile(100)); + //json.writeEndObject(); + + //json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", collapserMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get()); + json.writeBooleanField("propertyValue_requestCacheEnabled", collapserMetrics.getProperties().requestCacheEnabled().get()); + json.writeNumberField("propertyValue_maxRequestsInBatch", collapserMetrics.getProperties().maxRequestsInBatch().get()); + json.writeNumberField("propertyValue_timerDelayInMilliseconds", collapserMetrics.getProperties().timerDelayInMilliseconds().get()); + + json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster + + json.writeEndObject(); + } + + protected static void safelyWriteNumberField(JsonGenerator json, String name, Supplier metricGenerator) throws IOException { + try { + json.writeNumberField(name, metricGenerator.get()); + } catch (NoSuchFieldError error) { + logger.error("While publishing Hystrix metrics stream, error looking up eventType for : " + name + ". Please check that all Hystrix versions are the same!"); + json.writeNumberField(name, 0L); + } + } + + +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixMetric.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixMetric.java new file mode 100644 index 000000000..233341ecd --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixMetric.java @@ -0,0 +1,63 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.reactivesocket.serialize; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.cbor.CBORFactory; +import com.fasterxml.jackson.dataformat.cbor.CBORParser; +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class SerialHystrixMetric { + protected final static CBORFactory cborFactory = new CBORFactory(); + protected final static ObjectMapper mapper = new ObjectMapper(); + protected final static Logger logger = LoggerFactory.getLogger(SerialHystrixMetric.class); + + public static Payload toPayload(byte[] byteArray) { + return new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer.wrap(byteArray); + } + + @Override + public ByteBuffer getMetadata() { + return Frame.NULL_BYTEBUFFER; + } + }; + } + + public static String fromByteBufferToString(ByteBuffer bb) { + byte[] byteArray = new byte[bb.remaining()]; + bb.get(byteArray); + + try { + CBORParser parser = cborFactory.createParser(byteArray); + JsonNode rootNode = mapper.readTree(parser); + + return rootNode.toString(); + } catch (IOException ioe) { + logger.error("IO Exception during deserialization of ByteBuffer of Hystrix Metric : " + ioe); + return ""; + } + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixRequestEvents.java similarity index 53% rename from hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java rename to hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixRequestEvents.java index 7cf85a3d8..3f177d776 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/requests/HystrixRequestEventsStream.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixRequestEvents.java @@ -1,63 +1,47 @@ -package com.netflix.hystrix.contrib.reactivesocket.requests; +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.reactivesocket.serialize; import com.fasterxml.jackson.core.JsonGenerator; import com.netflix.hystrix.ExecutionResult; import com.netflix.hystrix.HystrixEventType; -import com.netflix.hystrix.contrib.reactivesocket.BasePayloadSupplier; import com.netflix.hystrix.metric.HystrixRequestEvents; -import io.reactivesocket.Frame; -import io.reactivesocket.Payload; import org.agrona.LangUtil; -import rx.Observable; -import rx.schedulers.Schedulers; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -public class HystrixRequestEventsStream extends BasePayloadSupplier { - private static HystrixRequestEventsStream INSTANCE = new HystrixRequestEventsStream(); +public class SerialHystrixRequestEvents extends SerialHystrixMetric { - private HystrixRequestEventsStream() { - super(); - - com.netflix.hystrix.metric.HystrixRequestEventsStream.getInstance() - .observe() - .observeOn(Schedulers.computation()) - .map(this::getPayloadData) - .map(b -> - new Payload() { - @Override - public ByteBuffer getData() { - return ByteBuffer.wrap(b); - } - - @Override - public ByteBuffer getMetadata() { - return Frame.NULL_BYTEBUFFER; - } - }) - .subscribe(subject); - } - - public static HystrixRequestEventsStream getInstance() { - return INSTANCE; - } - - @Override - public Observable get() { - return subject; - } - - public byte[] getPayloadData(HystrixRequestEvents requestEvents) { + public static byte[] toBytes(HystrixRequestEvents requestEvents) { byte[] retVal = null; try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); - JsonGenerator json = jsonFactory.createGenerator(bos); - writeRequestAsJson(json, requestEvents); + JsonGenerator json = cborFactory.createGenerator(bos); + + json.writeStartArray(); + + for (Map.Entry> entry: requestEvents.getExecutionsMappedToLatencies().entrySet()) { + convertExecutionToJson(json, entry.getKey(), entry.getValue()); + } + + json.writeEndArray(); json.close(); retVal = bos.toByteArray(); @@ -68,17 +52,7 @@ public byte[] getPayloadData(HystrixRequestEvents requestEvents) { return retVal; } - private void writeRequestAsJson(JsonGenerator json, HystrixRequestEvents request) throws IOException { - json.writeStartArray(); - - for (Map.Entry> entry: request.getExecutionsMappedToLatencies().entrySet()) { - convertExecutionToJson(json, entry.getKey(), entry.getValue()); - } - - json.writeEndArray(); - } - - private void convertExecutionToJson(JsonGenerator json, HystrixRequestEvents.ExecutionSignature executionSignature, List latencies) throws IOException { + private static void convertExecutionToJson(JsonGenerator json, HystrixRequestEvents.ExecutionSignature executionSignature, List latencies) throws IOException { json.writeStartObject(); json.writeStringField("name", executionSignature.getCommandName()); json.writeArrayFieldStart("events"); @@ -115,4 +89,4 @@ private void convertExecutionToJson(JsonGenerator json, HystrixRequestEvents.Exe } json.writeEndObject(); } -} \ No newline at end of file +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixUtilization.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixUtilization.java new file mode 100644 index 000000000..1018c2fe0 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/serialize/SerialHystrixUtilization.java @@ -0,0 +1,129 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.reactivesocket.serialize; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.dataformat.cbor.CBORParser; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.metric.sample.HystrixCommandUtilization; +import com.netflix.hystrix.metric.sample.HystrixThreadPoolUtilization; +import com.netflix.hystrix.metric.sample.HystrixUtilization; +import org.agrona.LangUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +public class SerialHystrixUtilization extends SerialHystrixMetric { + + private final static Logger logger = LoggerFactory.getLogger(SerialHystrixUtilization.class); + + public static byte[] toBytes(HystrixUtilization utilization) { + byte[] retVal = null; + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + JsonGenerator json = cborFactory.createGenerator(bos); + + json.writeStartObject(); + json.writeStringField("type", "HystrixUtilization"); + json.writeObjectFieldStart("commands"); + for (Map.Entry entry: utilization.getCommandUtilizationMap().entrySet()) { + final HystrixCommandKey key = entry.getKey(); + final HystrixCommandUtilization commandUtilization = entry.getValue(); + writeCommandUtilizationJson(json, key, commandUtilization); + + } + json.writeEndObject(); + + json.writeObjectFieldStart("threadpools"); + for (Map.Entry entry: utilization.getThreadPoolUtilizationMap().entrySet()) { + final HystrixThreadPoolKey threadPoolKey = entry.getKey(); + final HystrixThreadPoolUtilization threadPoolUtilization = entry.getValue(); + writeThreadPoolUtilizationJson(json, threadPoolKey, threadPoolUtilization); + } + json.writeEndObject(); + json.writeEndObject(); + json.close(); + + retVal = bos.toByteArray(); + } catch (Exception e) { + LangUtil.rethrowUnchecked(e); + } + + return retVal; + } + + public static HystrixUtilization fromByteBuffer(ByteBuffer bb) { + byte[] byteArray = new byte[bb.remaining()]; + bb.get(byteArray); + + Map commandUtilizationMap = new HashMap<>(); + Map threadPoolUtilizationMap = new HashMap<>(); + + try { + CBORParser parser = cborFactory.createParser(byteArray); + JsonNode rootNode = mapper.readTree(parser); + + Iterator> commands = rootNode.path("commands").fields(); + Iterator> threadPools = rootNode.path("threadpools").fields(); + + while (commands.hasNext()) { + Map.Entry command = commands.next(); + HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(command.getKey()); + HystrixCommandUtilization commandUtilization = new HystrixCommandUtilization(command.getValue().path("activeCount").asInt()); + commandUtilizationMap.put(commandKey, commandUtilization); + } + + while (threadPools.hasNext()) { + Map.Entry threadPool = threadPools.next(); + HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey(threadPool.getKey()); + HystrixThreadPoolUtilization threadPoolUtilization = new HystrixThreadPoolUtilization( + threadPool.getValue().path("activeCount").asInt(), + threadPool.getValue().path("corePoolSize").asInt(), + threadPool.getValue().path("poolSize").asInt(), + threadPool.getValue().path("queueSize").asInt() + ); + threadPoolUtilizationMap.put(threadPoolKey, threadPoolUtilization); + } + } catch (IOException ioe) { + logger.error("IO Exception during desrialization of HystrixUtilization : " + ioe); + } + return new HystrixUtilization(commandUtilizationMap, threadPoolUtilizationMap); + } + + private static void writeCommandUtilizationJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandUtilization utilization) throws IOException { + json.writeObjectFieldStart(key.name()); + json.writeNumberField("activeCount", utilization.getConcurrentCommandCount()); + json.writeEndObject(); + } + + private static void writeThreadPoolUtilizationJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolUtilization utilization) throws IOException { + json.writeObjectFieldStart(threadPoolKey.name()); + json.writeNumberField("activeCount", utilization.getCurrentActiveCount()); + json.writeNumberField("queueSize", utilization.getCurrentQueueSize()); + json.writeNumberField("corePoolSize", utilization.getCurrentCorePoolSize()); + json.writeNumberField("poolSize", utilization.getCurrentPoolSize()); + json.writeEndObject(); + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java index 8c7a55855..d1fa7d59e 100644 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java @@ -1,3 +1,18 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.netflix.hystrix.contrib.reactivesocket; @@ -16,6 +31,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class EventStreamRequestHandlerTest { @@ -26,7 +42,7 @@ public void testEventStreamRequestN() throws Exception { public ByteBuffer getData() { return ByteBuffer .allocate(BitUtil.SIZE_OF_INT) - .putInt(EventStreamEnum.METRICS_STREAM.getTypeId()); + .putInt(EventStreamEnum.GENERAL_DASHBOARD_STREAM.getTypeId()); } @Override @@ -105,7 +121,7 @@ public void testEventStreamFireHose() throws Exception { public ByteBuffer getData() { return ByteBuffer .allocate(BitUtil.SIZE_OF_INT) - .putInt(EventStreamEnum.METRICS_STREAM.getTypeId()); + .putInt(EventStreamEnum.GENERAL_DASHBOARD_STREAM.getTypeId()); } @Override @@ -123,41 +139,43 @@ public ByteBuffer getMetadata() { }, 0, 1, TimeUnit.MILLISECONDS); CountDownLatch latch = new CountDownLatch(1); - CountDownLatch latch1 = new CountDownLatch(25); + CountDownLatch latch1 = new CountDownLatch(15); AtomicReference subscriptionAtomicReference = new AtomicReference<>(); EventStreamRequestHandler handler = new EventStreamRequestHandler(); Publisher payloadPublisher = handler.handleSubscription(payload); + AtomicInteger i = new AtomicInteger(0); + payloadPublisher .subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - subscriptionAtomicReference.set(s); - latch.countDown(); - } + @Override + public void onSubscribe(Subscription s) { + subscriptionAtomicReference.set(s); + latch.countDown(); + } - @Override - public void onNext(Payload payload) { - ByteBuffer data = payload.getData(); - String s = new String(data.array()); + @Override + public void onNext(Payload payload) { + ByteBuffer data = payload.getData(); + String s = new String(data.array()); - System.out.println(s); + System.out.println(System.currentTimeMillis() + " : " + i.incrementAndGet()); - latch1.countDown(); - } + latch1.countDown(); + } - @Override - public void onError(Throwable t) { + @Override + public void onError(Throwable t) { - } + } - @Override - public void onComplete() { + @Override + public void onComplete() { - } - }); + } + }); latch.await(); diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java new file mode 100644 index 000000000..cb2c4071a --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java @@ -0,0 +1,597 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.reactivesocket; + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; +import io.reactivesocket.Payload; +import org.agrona.BitUtil; +import org.junit.Before; +import org.junit.Test; +import rx.Observable; +import rx.Subscriber; +import rx.Subscription; +import rx.schedulers.Schedulers; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +public class EventStreamTest extends HystrixStreamTest { + + EventStream stream; + + @Before + public void init() { + stream = new EventStream(Observable.interval(10, TimeUnit.MILLISECONDS) + .map(ts -> { + Payload p = new Payload() { + @Override + public ByteBuffer getData() { + return ByteBuffer.allocate(BitUtil.SIZE_OF_INT * 2) + .putInt(0, 1) + .putInt(BitUtil.SIZE_OF_INT, 2); + } + + @Override + public ByteBuffer getMetadata() { + return null; + } + }; + + return p; + + }) + ); + } + + @Test + public void testConfigStreamHasData() throws Exception { + final AtomicBoolean hasBytes = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + final int NUM = 10; + + EventStream.getInstance(EventStreamEnum.CONFIG_STREAM).get() + .take(NUM) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Config OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Config OnError : " + e); + latch.countDown(); + } + + @Override + public void onNext(Payload payload) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Config OnNext w bytes : " + payload.getData().remaining()); + if (payload.getData().remaining() > 0) { + hasBytes.set(true); + } + } + }); + + for (int i = 0; i < NUM; i++) { + HystrixCommand cmd = new SyntheticBlockingCommand(); + cmd.execute(); + } + + assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(hasBytes.get()); + } + + @Test + public void testUtilizationStreamHasData() throws Exception { + final AtomicBoolean hasBytes = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + final int NUM = 10; + + EventStream.getInstance(EventStreamEnum.UTILIZATION_STREAM).get() + .take(NUM) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Utilization OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Utilization OnError : " + e); + latch.countDown(); + } + + @Override + public void onNext(Payload payload) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Utilization OnNext w bytes : " + payload.getData().remaining()); + if (payload.getData().remaining() > 0) { + hasBytes.set(true); + } + } + }); + + for (int i = 0; i < NUM; i++) { + HystrixCommand cmd = new SyntheticBlockingCommand(); + cmd.execute(); + } + + assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(hasBytes.get()); + } + + @Test + public void testRequestEventStreamHasData() throws Exception { + final AtomicBoolean hasBytes = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + final int NUM = 10; + + EventStream.getInstance(EventStreamEnum.REQUEST_EVENT_STREAM).get() + .take(NUM) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Request Event OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Request Event OnError : " + e); + latch.countDown(); + } + + @Override + public void onNext(Payload payload) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Request Event OnNext w bytes : " + payload.getData().remaining()); + if (payload.getData().remaining() > 0) { + hasBytes.set(true); + } + } + }); + + for (int i = 0; i < NUM; i++) { + HystrixRequestContext requestContext = HystrixRequestContext.initializeContext(); + HystrixCommand cmd = new SyntheticBlockingCommand(); + cmd.execute(); + requestContext.close(); + } + + assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(hasBytes.get()); + } + + @Test + public void testDashboardStreamHasData() throws Exception { + final AtomicBoolean hasBytes = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + final int NUM = 10; + + EventStream.getInstance(EventStreamEnum.GENERAL_DASHBOARD_STREAM).get() + .take(NUM) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Dashboard OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Dashboard OnError : " + e); + latch.countDown(); + } + + @Override + public void onNext(Payload payload) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Dashboard OnNext w bytes : " + payload.getData().remaining()); + if (payload.getData().remaining() > 0) { + hasBytes.set(true); + } + } + }); + + for (int i = 0; i < NUM; i++) { + HystrixCommand cmd = new SyntheticBlockingCommand(); + cmd.execute(); + } + + assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(hasBytes.get()); + } + + @Test + public void testSharedSourceStream() throws Exception { + CountDownLatch latch = new CountDownLatch(2); + AtomicReference p1 = new AtomicReference<>(null); + AtomicReference p2 = new AtomicReference<>(null); + + Subscription s1 = stream + .get() + .take(10) + .observeOn(Schedulers.computation()) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e); + latch.countDown(); + } + + @Override + public void onNext(Payload payload) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + payload.getData().remaining()); + p1.set(payload); + } + }); + + Subscription s2 = stream + .get() + .take(10) + .observeOn(Schedulers.computation()) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e); + latch.countDown(); + } + + @Override + public void onNext(Payload payload) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + payload.getData().remaining()); + p2.set(payload); + } + }); + + for (int i = 0; i < 10; i++) { + HystrixCommand cmd = new SyntheticBlockingCommand(); + cmd.execute(); + } + + assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); + assertNotNull(p1.get()); + assertEquals(p1.get(), p2.get()); //this is intentionally checking object equality (not value). + //we should be getting the same object from both streams. this ensures that multiple subscribers don't induce extra work + } + + @Test + public void testTwoSubscribersOneUnsubscribes() throws Exception { + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + AtomicInteger payloads1 = new AtomicInteger(0); + AtomicInteger payloads2 = new AtomicInteger(0); + + Subscription s1 = stream + .get() + .take(100) + .observeOn(Schedulers.computation()) + .doOnUnsubscribe(latch1::countDown) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted"); + latch1.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e); + latch1.countDown(); + } + + @Override + public void onNext(Payload payload) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + payload.getData().remaining()); + payloads1.incrementAndGet(); + } + }); + + Subscription s2 = stream + .get() + .take(100) + .observeOn(Schedulers.computation()) + .doOnUnsubscribe(latch2::countDown) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted"); + latch2.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e); + latch2.countDown(); + } + + @Override + public void onNext(Payload payload) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + payload.getData().remaining() + " : " + payloads2.get()); + payloads2.incrementAndGet(); + } + }); + //execute 1 command, then unsubscribe from first stream. then execute the rest + for (int i = 0; i < 5; i++) { + HystrixCommand cmd = new SyntheticBlockingCommand(); + cmd.execute(); + if (i == 1) { + s1.unsubscribe(); + } + } + assertTrue(stream.isSourceCurrentlySubscribed()); //only 1/2 subscriptions has been cancelled + + assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS)); + System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get()); + assertTrue("s1 got data", payloads1.get() > 0); + assertTrue("s2 got data", payloads2.get() > 0); + assertTrue("s1 got less data than s2", payloads2.get() > payloads1.get()); + } + + @Test + public void testTwoSubscribersBothUnsubscribe() throws Exception { + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + AtomicInteger payloads1 = new AtomicInteger(0); + AtomicInteger payloads2 = new AtomicInteger(0); + + Subscription s1 = stream + .get() + .take(100) + .observeOn(Schedulers.computation()) + .doOnUnsubscribe(latch1::countDown) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted"); + latch1.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e); + latch1.countDown(); + } + + @Override + public void onNext(Payload payload) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + payload.getData().remaining()); + payloads1.incrementAndGet(); + } + }); + + Subscription s2 = stream + .get() + .take(100) + .observeOn(Schedulers.computation()) + .doOnUnsubscribe(latch2::countDown) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted"); + latch2.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e); + latch2.countDown(); + } + + @Override + public void onNext(Payload payload) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + payload.getData().remaining()); + payloads2.incrementAndGet(); + } + }); + //execute 1 command, then unsubscribe from both streams. then execute the rest + for (int i = 0; i < 5; i++) { + HystrixCommand cmd = new SyntheticBlockingCommand(); + cmd.execute(); + if (i == 1) { + s1.unsubscribe(); + s2.unsubscribe(); + } + } + assertFalse(stream.isSourceCurrentlySubscribed()); //both subscriptions have been cancelled + + System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get()); + assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS)); + assertTrue("s1 got data", payloads1.get() > 0); + assertTrue("s2 got data", payloads2.get() > 0); + } + + @Test + public void testTwoSubscribersBothUnsubscribeThenNewSubscriber() throws Exception { + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + AtomicInteger payloads1 = new AtomicInteger(0); + AtomicInteger payloads2 = new AtomicInteger(0); + + Subscription s1 = stream + .get() + .take(100) + .observeOn(Schedulers.computation()) + .doOnUnsubscribe(latch1::countDown) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted"); + latch1.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e); + latch1.countDown(); + } + + @Override + public void onNext(Payload payload) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + payload.getData().remaining()); + payloads1.incrementAndGet(); + } + }); + + Subscription s2 = stream + .get() + .take(100) + .observeOn(Schedulers.computation()) + .doOnUnsubscribe(latch2::countDown) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted"); + latch2.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e); + latch2.countDown(); + } + + @Override + public void onNext(Payload payload) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + payload.getData().remaining()); + payloads2.incrementAndGet(); + } + }); + //execute 1 command, then unsubscribe from both streams. then execute the rest + for (int i = 0; i < 5; i++) { + HystrixCommand cmd = new SyntheticBlockingCommand(); + cmd.execute(); + if (i == 1) { + s1.unsubscribe(); + s2.unsubscribe(); + } + } + + System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get()); + assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS)); + assertTrue("s1 got data", payloads1.get() > 0); + assertTrue("s2 got data", payloads2.get() > 0); + + final int NUM_DATA_REQUESTED = 100; + CountDownLatch latch3 = new CountDownLatch(1); + AtomicInteger payloads3 = new AtomicInteger(0); + Subscription s3 = stream + .get() + .take(NUM_DATA_REQUESTED) + .observeOn(Schedulers.computation()) + .doOnUnsubscribe(latch3::countDown) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 3 OnCompleted"); + latch3.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 3 OnError : " + e); + latch3.countDown(); + } + + @Override + public void onNext(Payload payload) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 3 OnNext : " + payload.getData().remaining()); + payloads3.incrementAndGet(); + } + }); + + assertTrue(stream.isSourceCurrentlySubscribed()); //should be doing work when re-subscribed + + assertTrue(latch3.await(10000, TimeUnit.MILLISECONDS)); + assertEquals(NUM_DATA_REQUESTED, payloads3.get()); + } + + @Test + public void testTwoSubscribersOneSlow() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean foundError = new AtomicBoolean(false); + + Observable fast = stream + .get() + .observeOn(Schedulers.newThread()); + Observable slow = stream + .get() + .observeOn(Schedulers.newThread()) + .map(n -> { + try { + System.out.println("Sleeping on thread : " + Thread.currentThread().getName()); + Thread.sleep(100); + return n; + } catch (InterruptedException ex) { + return n; + } + }); + + Observable checkZippedEqual = Observable.zip(fast, slow, (payload, payload2) -> payload == payload2); + + Subscription s1 = checkZippedEqual + .take(10000) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnError : " + e); + e.printStackTrace(); + foundError.set(true); + latch.countDown(); + } + + @Override + public void onNext(Boolean b) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnNext : " + b); + } + }); + + for (int i = 0; i < 50; i++) { + HystrixCommand cmd = new SyntheticBlockingCommand(); + cmd.execute(); + } + + latch.await(5000, TimeUnit.MILLISECONDS); + assertFalse(foundError.get()); + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/HystrixDashboardStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/HystrixDashboardStreamTest.java new file mode 100644 index 000000000..99e3e89c6 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/HystrixDashboardStreamTest.java @@ -0,0 +1,272 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.reactivesocket; + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixCommandMetrics; +import org.junit.Before; +import org.junit.Test; +import rx.Observable; +import rx.Subscriber; +import rx.Subscription; +import rx.functions.Actions; +import rx.schedulers.Schedulers; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class HystrixDashboardStreamTest extends HystrixStreamTest { + + HystrixDashboardStream stream; + + @Before + public void init() { + stream = HystrixDashboardStream.getNonSingletonInstanceOnlyUsedInUnitTests(10); + } + + @Test + public void testStreamHasData() throws Exception { + final AtomicBoolean commandShowsUp = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + final int NUM = 10; + + for (int i = 0; i < 2; i++) { + HystrixCommand cmd = new SyntheticBlockingCommand(); + cmd.observe(); + } + + stream.observe().take(NUM).subscribe(dashboardData -> { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Received data with : " + dashboardData.commandMetrics.size() + " commands"); + for (HystrixCommandMetrics metrics: dashboardData.commandMetrics) { + if (metrics.getCommandKey().name().equals("SyntheticBlockingCommand")) { + commandShowsUp.set(true); + } + } + }, + Actions.empty(), + () -> { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted"); + latch.countDown(); + }); + + assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(commandShowsUp.get()); + } + + @Test + public void testTwoSubscribersOneUnsubscribes() throws Exception { + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + AtomicInteger payloads1 = new AtomicInteger(0); + AtomicInteger payloads2 = new AtomicInteger(0); + + Subscription s1 = stream + .observe() + .take(100) + .doOnUnsubscribe(latch1::countDown) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted"); + latch1.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e); + latch1.countDown(); + } + + @Override + public void onNext(HystrixDashboardStream.DashboardData dashboardData) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + dashboardData); + payloads1.incrementAndGet(); + } + }); + + Subscription s2 = stream + .observe() + .take(100) + .doOnUnsubscribe(latch2::countDown) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted"); + latch2.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e); + latch2.countDown(); + } + + @Override + public void onNext(HystrixDashboardStream.DashboardData dashboardData) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + dashboardData); + payloads2.incrementAndGet(); + } + }); + //execute 1 command, then unsubscribe from first stream. then execute the rest + for (int i = 0; i < 50; i++) { + HystrixCommand cmd = new SyntheticBlockingCommand(); + cmd.execute(); + if (i == 1) { + s1.unsubscribe(); + } + } + + assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS)); + System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get()); + assertTrue("s1 got data", payloads1.get() > 0); + assertTrue("s2 got data", payloads2.get() > 0); + assertTrue("s1 got less data than s2", payloads2.get() > payloads1.get()); + } + + @Test + public void testTwoSubscribersBothUnsubscribe() throws Exception { + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + AtomicInteger payloads1 = new AtomicInteger(0); + AtomicInteger payloads2 = new AtomicInteger(0); + + Subscription s1 = stream + .observe() + .take(10) + .doOnUnsubscribe(latch1::countDown) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted"); + latch1.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e); + latch1.countDown(); + } + + @Override + public void onNext(HystrixDashboardStream.DashboardData dashboardData) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + dashboardData); + payloads1.incrementAndGet(); + } + }); + + Subscription s2 = stream + .observe() + .take(10) + .doOnUnsubscribe(latch2::countDown) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted"); + latch2.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e); + latch2.countDown(); + } + + @Override + public void onNext(HystrixDashboardStream.DashboardData dashboardData) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + dashboardData); + payloads2.incrementAndGet(); + } + }); + //execute half the commands, then unsubscribe from both streams, then execute the rest + for (int i = 0; i < 50; i++) { + HystrixCommand cmd = new SyntheticBlockingCommand(); + cmd.execute(); + if (i == 25) { + s1.unsubscribe(); + s2.unsubscribe(); + } + } + assertFalse(stream.isSourceCurrentlySubscribed()); //both subscriptions have been cancelled - source should be too + + assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS)); + assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS)); + System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get()); + assertTrue("s1 got data", payloads1.get() > 0); + assertTrue("s2 got data", payloads2.get() > 0); + } + + @Test + public void testTwoSubscribersOneSlowOneFast() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean foundError = new AtomicBoolean(false); + + Observable fast = stream + .observe() + .observeOn(Schedulers.newThread()); + Observable slow = stream + .observe() + .observeOn(Schedulers.newThread()) + .map(n -> { + try { + System.out.println("Sleeping on thread : " + Thread.currentThread().getName()); + Thread.sleep(100); + return n; + } catch (InterruptedException ex) { + return n; + } + }); + + Observable checkZippedEqual = Observable.zip(fast, slow, (payload, payload2) -> payload == payload2); + + Subscription s1 = checkZippedEqual + .take(10000) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnError : " + e); + e.printStackTrace(); + foundError.set(true); + latch.countDown(); + } + + @Override + public void onNext(Boolean b) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnNext : " + b); + } + }); + + for (int i = 0; i < 50; i++) { + HystrixCommand cmd = new SyntheticBlockingCommand(); + cmd.execute(); + } + + latch.await(10000, TimeUnit.MILLISECONDS); + assertFalse(foundError.get()); + } +} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/HystrixStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/HystrixStreamTest.java new file mode 100644 index 000000000..5324b3031 --- /dev/null +++ b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/HystrixStreamTest.java @@ -0,0 +1,65 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.hystrix.contrib.reactivesocket; + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.HystrixObservableCommand; +import rx.Observable; +import rx.schedulers.Schedulers; + +import java.util.Random; + +public class HystrixStreamTest { + + static final Random r = new Random(); + + protected static class SyntheticBlockingCommand extends HystrixCommand { + + public SyntheticBlockingCommand() { + super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("UNITTEST"))); + } + + @Override + protected Integer run() throws Exception { + int n = r.nextInt(100); + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " sleeping for : " + n); + Thread.sleep(n); + return n; + } + } + + protected static class SyntheticAsyncCommand extends HystrixObservableCommand { + + public SyntheticAsyncCommand() { + super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("UNITTEST"))); + } + + @Override + protected Observable construct() { + return Observable.defer(() -> { + try { + int n = r.nextInt(100); + Thread.sleep(n); + return Observable.just(n); + } catch (InterruptedException ex) { + return Observable.error(ex); + } + }).subscribeOn(Schedulers.io()); + } + } +} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStreamTest.java deleted file mode 100644 index a65d6ff89..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCollapserMetricsStreamTest.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.netflix.hystrix.contrib.reactivesocket.metrics; - -import com.netflix.hystrix.HystrixCommand; -import com.netflix.hystrix.HystrixCommandGroupKey; -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; - -public class HystrixCollapserMetricsStreamTest { - - @Test - public void test() throws Exception { - CountDownLatch latch = new CountDownLatch(21); - HystrixCommandMetricsStream - .getInstance() - .get() - .subscribe(payload -> { - ByteBuffer data = payload.getData(); - String s = new String(data.array()); - - System.out.println(s); - latch.countDown(); - }); - - - for (int i = 0; i < 20; i++) { - TestCommand test = new TestCommand(); - - test.execute(); - latch.countDown(); - } - - latch.await(); - } - - class TestCommand extends HystrixCommand { - protected TestCommand() { - super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); - } - - @Override - protected Boolean run() throws Exception { - return true; - } - } - - -} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java deleted file mode 100644 index 7cbe344f5..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/metrics/HystrixCommandMetricsStreamTest.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.netflix.hystrix.contrib.reactivesocket.metrics; - -import com.netflix.hystrix.HystrixCommand; -import com.netflix.hystrix.HystrixCommandGroupKey; -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; - -/** - * Created by rroeser on 5/19/16. - */ -public class HystrixCommandMetricsStreamTest { - @Test - public void test() throws Exception { - CountDownLatch latch = new CountDownLatch(23); - HystrixCommandMetricsStream - .getInstance() - .get() - .subscribe(payload -> { - ByteBuffer data = payload.getData(); - String s = new String(data.array()); - - System.out.println(s); - latch.countDown(); - }); - - for (int i = 0; i < 20; i++) { - TestCommand test = new TestCommand(latch); - - test.execute(); - } - - latch.await(); - } - - class TestCommand extends HystrixCommand { - CountDownLatch latch; - protected TestCommand(CountDownLatch latch) { - super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); - this.latch = latch; - } - - @Override - protected Boolean run() throws Exception { - latch.countDown(); - return true; - } - } - -} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java deleted file mode 100644 index 5212bab00..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/sample/HystrixConfigStreamTest.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.netflix.hystrix.contrib.reactivesocket.sample; - -import com.netflix.hystrix.HystrixCommand; -import com.netflix.hystrix.HystrixCommandGroupKey; -import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCommandMetricsStream; -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; - -/** - * Created by rroeser on 5/19/16. - */ -public class HystrixConfigStreamTest { - @Test - public void test() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - HystrixCommandMetricsStream - .getInstance() - .get() - .subscribe(payload -> { - ByteBuffer data = payload.getData(); - String s = new String(data.array()); - - System.out.println(s); - latch.countDown(); - }); - - - for (int i = 0; i < 20; i++) { - TestCommand test = new TestCommand(); - - test.execute(); - } - - latch.await(); - } - - class TestCommand extends HystrixCommand { - protected TestCommand() { - super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); - } - - @Override - protected Boolean run() throws Exception { - System.out.println("IM A HYSTRIX COMMAND!!!!!"); - return true; - } - } -} \ No newline at end of file diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCommandConfiguration.java b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCommandConfiguration.java index 1c6a27dac..20c3b2cc6 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCommandConfiguration.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCommandConfiguration.java @@ -30,7 +30,7 @@ public class HystrixCommandConfiguration { private final HystrixCommandCircuitBreakerConfig circuitBreakerConfig; private final HystrixCommandMetricsConfig metricsConfig; - private HystrixCommandConfiguration(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, + public HystrixCommandConfiguration(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, HystrixCommandExecutionConfig executionConfig, HystrixCommandCircuitBreakerConfig circuitBreakerConfig, HystrixCommandMetricsConfig metricsConfig) { diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfiguration.java b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfiguration.java index b1933bd41..f2d101ec3 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfiguration.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfiguration.java @@ -26,7 +26,7 @@ public class HystrixConfiguration { private final Map threadPoolConfig; private final Map collapserConfig; - private HystrixConfiguration(Map commandConfig, + public HystrixConfiguration(Map commandConfig, Map threadPoolConfig, Map collapserConfig) { this.commandConfig = commandConfig; diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java index 9d2f50f5a..321a838a3 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java @@ -29,7 +29,7 @@ public class HystrixThreadPoolConfiguration { private final int rollingCounterNumberOfBuckets; private final int rollingCounterBucketSizeInMilliseconds; - private HystrixThreadPoolConfiguration(HystrixThreadPoolKey threadPoolKey, int coreSize, int maxQueueSize, int queueRejectionThreshold, + public HystrixThreadPoolConfiguration(HystrixThreadPoolKey threadPoolKey, int coreSize, int maxQueueSize, int queueRejectionThreshold, int keepAliveTimeInMinutes, int rollingCounterNumberOfBuckets, int rollingCounterBucketSizeInMilliseconds) { this.threadPoolKey = threadPoolKey; diff --git a/hystrix-examples-reactivesocket/client/build.gradle b/hystrix-examples-reactivesocket/client/build.gradle new file mode 100644 index 000000000..169f69bc9 --- /dev/null +++ b/hystrix-examples-reactivesocket/client/build.gradle @@ -0,0 +1,18 @@ +apply plugin: 'application' + +mainClassName = 'com.netflix.hystrix.examples.reactivesocket.HystrixMetricsReactiveSocketClientRunner' +applicationDefaultJvmArgs = ["-Dio.netty.leakDetectionLevel=paranoid"] + +sourceCompatibility = JavaVersion.VERSION_1_8 +targetCompatibility = JavaVersion.VERSION_1_8 + +repositories { + maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' } +} + +dependencies { + compile project(':hystrix-core') + compile project(':hystrix-examples') + compile project(':hystrix-reactivesocket-event-stream') +} + diff --git a/hystrix-examples-reactivesocket/client/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketClientRunner.java b/hystrix-examples-reactivesocket/client/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketClientRunner.java new file mode 100644 index 000000000..70996dcae --- /dev/null +++ b/hystrix-examples-reactivesocket/client/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketClientRunner.java @@ -0,0 +1,128 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.examples.reactivesocket; + +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.config.HystrixCommandConfiguration; +import com.netflix.hystrix.config.HystrixConfiguration; +import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; +import com.netflix.hystrix.contrib.reactivesocket.EventStreamEnum; +import com.netflix.hystrix.contrib.reactivesocket.client.HystrixMetricsReactiveSocketClient; +import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixConfiguration; +import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixMetric; +import com.netflix.hystrix.contrib.reactivesocket.serialize.SerialHystrixUtilization; +import com.netflix.hystrix.metric.sample.HystrixCommandUtilization; +import com.netflix.hystrix.metric.sample.HystrixUtilization; +import io.netty.channel.nio.NioEventLoopGroup; +import io.reactivesocket.Payload; +import org.reactivestreams.Publisher; +import rx.Observable; +import rx.RxReactiveStreams; +import rx.Subscriber; +import rx.Subscription; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class HystrixMetricsReactiveSocketClientRunner { + public static void main(String[] args) throws InterruptedException { + System.out.println("Starting HystrixMetricsReactiveSocketClient..."); + + HystrixMetricsReactiveSocketClient client = new HystrixMetricsReactiveSocketClient("127.0.0.1", 8025, new NioEventLoopGroup()); + client.startAndWait(); + + final EventStreamEnum eventStreamEnum = EventStreamEnum.REQUEST_EVENT_STREAM; + + //Publisher publisher = client.requestResponse(eventStreamEnum); + Publisher publisher = client.requestStream(eventStreamEnum, 10); + //Publisher publisher = client.requestSubscription(eventStreamEnum); + Observable o = RxReactiveStreams.toObservable(publisher); + + final CountDownLatch latch = new CountDownLatch(1); + + Subscription s = o.subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted"); + latch.countDown(); + } + + @Override + public void onError(Throwable e) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnError : " + e); + e.printStackTrace(); + latch.countDown(); + } + + @Override + public void onNext(Payload payload) { + final StringBuilder bldr = new StringBuilder(); + + switch (eventStreamEnum) { + case UTILIZATION_STREAM: + HystrixUtilization u = SerialHystrixUtilization.fromByteBuffer(payload.getData()); + bldr.append("CommandUtil["); + for (Map.Entry entry: u.getCommandUtilizationMap().entrySet()) { + bldr.append(entry.getKey().name()) + .append(" -> ") + .append(entry.getValue().getConcurrentCommandCount()) + .append(", "); + } + bldr.append("]"); + break; + case CONFIG_STREAM: + HystrixConfiguration config = SerialHystrixConfiguration.fromByteBuffer(payload.getData()); + bldr.append("CommandConfig["); + for (Map.Entry entry: config.getCommandConfig().entrySet()) { + bldr.append(entry.getKey().name()) + .append(" -> ") + .append(entry.getValue().getExecutionConfig().getIsolationStrategy().name()) + .append(", "); + } + bldr.append("] ThreadPoolConfig["); + for (Map.Entry entry: config.getThreadPoolConfig().entrySet()) { + bldr.append(entry.getKey().name()) + .append(" -> ") + .append(entry.getValue().getCoreSize()) + .append(", "); + } + bldr.append("]"); + break; + case REQUEST_EVENT_STREAM: + String requestEvents = SerialHystrixMetric.fromByteBufferToString(payload.getData()); + bldr.append("RequestEvents : ").append(requestEvents); + break; + case GENERAL_DASHBOARD_STREAM: + String dashboardData = SerialHystrixMetric.fromByteBufferToString(payload.getData()); + bldr.append("Summary : ").append(dashboardData); + break; + default: + throw new RuntimeException("don't have a way to convert from " + eventStreamEnum + " to string yet"); + } + + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnNext : " + bldr.toString()); + } + }); + + if (!latch.await(10000, TimeUnit.MILLISECONDS)) { + System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Unsubscribing - never received a terminal!"); + s.unsubscribe(); + } + System.exit(0); + } +} diff --git a/hystrix-examples-reactivesocket/server/build.gradle b/hystrix-examples-reactivesocket/server/build.gradle new file mode 100644 index 000000000..0025f125f --- /dev/null +++ b/hystrix-examples-reactivesocket/server/build.gradle @@ -0,0 +1,20 @@ +apply plugin: 'application' + +mainClassName = 'com.netflix.hystrix.examples.reactivesocket.HystrixMetricsReactiveSocketServer' + +applicationDefaultJvmArgs = ["-Dio.netty.leakDetectionLevel=paranoid"] + +sourceCompatibility = JavaVersion.VERSION_1_8 +targetCompatibility = JavaVersion.VERSION_1_8 + +repositories { + maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' } +} + +dependencies { + compile project(':hystrix-core') + compile project(':hystrix-examples') + compile project(':hystrix-reactivesocket-event-stream') + compile 'io.reactivesocket:reactivesocket-netty:0.1.9' +} + diff --git a/hystrix-examples-reactivesocket/server/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketServer.java b/hystrix-examples-reactivesocket/server/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketServer.java new file mode 100644 index 000000000..b23ee9186 --- /dev/null +++ b/hystrix-examples-reactivesocket/server/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketServer.java @@ -0,0 +1,67 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.examples.reactivesocket; + +import com.netflix.hystrix.contrib.reactivesocket.EventStreamRequestHandler; +import com.netflix.hystrix.examples.demo.HystrixCommandAsyncDemo; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.reactivesocket.netty.tcp.server.ReactiveSocketServerHandler; + +public class HystrixMetricsReactiveSocketServer { + + public static void main(String[] args) throws Exception { + System.out.println("Starting HystrixMetricsReactiveSocketServer..."); + + final ReactiveSocketServerHandler handler = ReactiveSocketServerHandler.create((setupPayload, rs) -> + new EventStreamRequestHandler()); + + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(handler); + } + }); + Channel localhost = b.bind("127.0.0.1", 8025).sync().channel(); + + executeCommands(); + localhost.closeFuture().sync(); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + + private static void executeCommands() { + new HystrixCommandAsyncDemo().startDemo(false); + } +} diff --git a/hystrix-examples/src/main/java/com/netflix/hystrix/examples/demo/HystrixCommandAsyncDemo.java b/hystrix-examples/src/main/java/com/netflix/hystrix/examples/demo/HystrixCommandAsyncDemo.java index 58fbbe9ec..de228b159 100644 --- a/hystrix-examples/src/main/java/com/netflix/hystrix/examples/demo/HystrixCommandAsyncDemo.java +++ b/hystrix-examples/src/main/java/com/netflix/hystrix/examples/demo/HystrixCommandAsyncDemo.java @@ -18,18 +18,24 @@ import java.math.BigDecimal; import java.net.HttpCookie; import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import com.netflix.config.ConfigurationManager; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixCommandMetrics; import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts; import com.netflix.hystrix.HystrixRequestLog; +import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable; import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; import rx.Observable; +import rx.Subscriber; import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func1; import rx.functions.Func2; +import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaSchedulersHook; /** * Executable client that demonstrates the lifecycle, metrics, request log and behavior of HystrixCommands. @@ -40,6 +46,26 @@ public class HystrixCommandAsyncDemo { // new HystrixCommandAsyncDemo().startDemo(true); // } + static class ContextAwareRxSchedulersHook extends RxJavaSchedulersHook { + @Override + public Action0 onSchedule(final Action0 initialAction) { + final Runnable initialRunnable = new Runnable() { + @Override + public void run() { + initialAction.call(); + } + }; + final Runnable wrappedRunnable = + new HystrixContextRunnable(initialRunnable); + return new Action0() { + @Override + public void call() { + wrappedRunnable.run(); + } + }; + } + } + public HystrixCommandAsyncDemo() { /* * Instead of using injected properties we'll set them via Archaius @@ -51,45 +77,49 @@ public HystrixCommandAsyncDemo() { ConfigurationManager.getConfigInstance().setProperty("hystrix.command.GetUserAccountCommand.execution.isolation.thread.timeoutInMilliseconds", 50); // set the rolling percentile more granular so we see data change every second rather than every 10 seconds as is the default ConfigurationManager.getConfigInstance().setProperty("hystrix.command.default.metrics.rollingPercentile.numBuckets", 60); + + RxJavaPlugins.getInstance().registerSchedulersHook(new ContextAwareRxSchedulersHook()); } - public void startDemo(boolean shouldLog) { + public void startDemo(final boolean shouldLog) { startMetricsMonitor(shouldLog); while (true) { - Observable o = runSimulatedRequestOnCallerThread(shouldLog); - o.subscribe(); + final HystrixRequestContext context = HystrixRequestContext.initializeContext(); + Observable o = observeSimulatedUserRequestForOrderConfirmationAndCreditCardPayment(); + + final CountDownLatch latch = new CountDownLatch(1); + o.subscribe(new Subscriber() { + @Override + public void onCompleted() { + latch.countDown(); + context.shutdown(); + } + + @Override + public void onError(Throwable e) { + e.printStackTrace(); + latch.countDown(); + context.shutdown(); + } + + @Override + public void onNext(CreditCardAuthorizationResult creditCardAuthorizationResult) { + if (shouldLog) { + System.out.println("Request => " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString()); + } + } + }); + try { - Thread.sleep(400); + latch.await(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { - ex.printStackTrace(); + System.out.println("INTERRUPTED!"); } } } private final static Random r = new Random(); - private Observable runSimulatedRequestOnCallerThread(final boolean shouldLog) { - final HystrixRequestContext context = HystrixRequestContext.initializeContext(); - //System.out.println("Map at start : " + map.get() + " : " + Thread.currentThread().getName()); - - return observeSimulatedUserRequestForOrderConfirmationAndCreditCardPayment() - .doOnUnsubscribe(new Action0() { - @Override - public void call() { - if (shouldLog) { - System.out.println("Request => " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString()); - } - context.shutdown(); - } - }) - .doOnError(new Action1() { - @Override - public void call(Throwable throwable) { - throwable.printStackTrace(); - } - }); - } - private class Pair { private final A a; private final B b; @@ -110,30 +140,36 @@ B b() { public Observable observeSimulatedUserRequestForOrderConfirmationAndCreditCardPayment() { /* fetch user object with http cookies */ - Observable user = new GetUserAccountCommand(new HttpCookie("mockKey", "mockValueFromHttpRequest")).observe(); + try { + Observable user = new GetUserAccountCommand(new HttpCookie("mockKey", "mockValueFromHttpRequest")).observe(); + /* fetch the payment information (asynchronously) for the user so the credit card payment can proceed */ + Observable paymentInformation = user.flatMap(new Func1>() { + @Override + public Observable call(UserAccount userAccount) { + return new GetPaymentInformationCommand(userAccount).observe(); + } + }); + + /* fetch the order we're processing for the user */ + int orderIdFromRequestArgument = 13579; + final Observable previouslySavedOrder = new GetOrderCommand(orderIdFromRequestArgument).observe(); + + return Observable.zip(paymentInformation, previouslySavedOrder, new Func2>() { + @Override + public Pair call(PaymentInformation paymentInformation, Order order) { + return new Pair(paymentInformation, order); + } + }).flatMap(new Func1, Observable>() { + @Override + public Observable call(Pair pair) { + return new CreditCardCommand(pair.b(), pair.a(), new BigDecimal(123.45)).observe(); + } + }); + } catch (IllegalArgumentException ex) { + return Observable.error(ex); + } - /* fetch the payment information (asynchronously) for the user so the credit card payment can proceed */ - Observable paymentInformation = user.flatMap(new Func1>() { - @Override - public Observable call(UserAccount userAccount) { - return new GetPaymentInformationCommand(userAccount).observe(); - } - }); - /* fetch the order we're processing for the user */ - int orderIdFromRequestArgument = 13579; - final Observable previouslySavedOrder = new GetOrderCommand(orderIdFromRequestArgument).observe(); - return Observable.zip(paymentInformation, previouslySavedOrder, new Func2>() { - @Override - public Pair call(PaymentInformation paymentInformation, Order order) { - return new Pair(paymentInformation, order); - } - }).flatMap(new Func1, Observable>() { - @Override - public Observable call(Pair pair) { - return new CreditCardCommand(pair.b(), pair.a(), new BigDecimal(123.45)).observe(); - } - }); } public void startMetricsMonitor(final boolean shouldLog) { diff --git a/settings.gradle b/settings.gradle index 24d313735..a34cb522b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -2,6 +2,8 @@ rootProject.name='hystrix' include 'hystrix-core', \ 'hystrix-examples', \ 'hystrix-examples-webapp', \ +'hystrix-examples-reactivesocket/client', \ +'hystrix-examples-reactivesocket/server', \ 'hystrix-contrib/hystrix-clj', \ 'hystrix-contrib/hystrix-request-servlet', \ 'hystrix-contrib/hystrix-servo-metrics-publisher', \ @@ -15,6 +17,8 @@ include 'hystrix-core', \ 'hystrix-contrib/hystrix-junit', \ 'hystrix-dashboard' +project(':hystrix-examples-reactivesocket/client').name = 'hystrix-examples-reactivesocket-client' +project(':hystrix-examples-reactivesocket/server').name = 'hystrix-examples-reactivesocket-server' project(':hystrix-contrib/hystrix-clj').name = 'hystrix-clj' project(':hystrix-contrib/hystrix-request-servlet').name = 'hystrix-request-servlet' project(':hystrix-contrib/hystrix-servo-metrics-publisher').name = 'hystrix-servo-metrics-publisher'