Skip to content

Commit

Permalink
Wired HystrixMetricsStreamServlet to HystrixDashboardData
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Jacobs committed Jun 22, 2016
1 parent 151a4a0 commit a2ee1c4
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -48,7 +48,7 @@
* </servlet-mapping>
* } </pre>
*/
public class HystrixMetricsStreamServlet extends HystrixSampleSseServlet<HystrixDashboardStream.DashboardData> {
public class HystrixMetricsStreamServlet extends HystrixSampleSseServlet {

private static final long serialVersionUID = -7548505095303313237L;

Expand All @@ -60,11 +60,16 @@ public class HystrixMetricsStreamServlet extends HystrixSampleSseServlet<Hystrix
DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

public HystrixMetricsStreamServlet() {
super(HystrixDashboardStream.getInstance().observe());
this(HystrixDashboardStream.getInstance().observe(), DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS);
}

/* package-private */ HystrixMetricsStreamServlet(Observable<HystrixDashboardStream.DashboardData> sampleStream, int pausePollerThreadDelayInMs) {
super(sampleStream, pausePollerThreadDelayInMs);
super(sampleStream.concatMap(new Func1<HystrixDashboardStream.DashboardData, Observable<String>>() {
@Override
public Observable<String> call(HystrixDashboardStream.DashboardData dashboardData) {
return Observable.from(SerialHystrixDashboardData.toMultipleJsonStrings(dashboardData));
}
}), pausePollerThreadDelayInMs);
}

@Override
Expand All @@ -86,9 +91,4 @@ protected int incrementAndGetCurrentConcurrentConnections() {
protected void decrementCurrentConcurrentConnections() {
concurrentConnections.decrementAndGet();
}

@Override
protected String convertToString(HystrixDashboardStream.DashboardData dashboardData) throws IOException {
return SerialHystrixDashboardData.toJsonString(dashboardData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.hystrix.config.HystrixConfiguration;
import com.netflix.hystrix.config.HystrixConfigurationStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -45,20 +48,31 @@
* </servlet-mapping>
* } </pre>
*/
public class HystrixConfigSseServlet extends HystrixSampleSseServlet<HystrixConfiguration> {
public class HystrixConfigSseServlet extends HystrixSampleSseServlet {

private static final long serialVersionUID = -3599771169762858235L;
private static final Logger logger = LoggerFactory.getLogger(HystrixConfigSseServlet.class);

/* used to track number of connections and throttle */
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

public HystrixConfigSseServlet() {
super(HystrixConfigurationStream.getInstance().observe());
this(HystrixConfigurationStream.getInstance().observe(), DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS);
}

/* package-private */ HystrixConfigSseServlet(Observable<HystrixConfiguration> sampleStream, int pausePollerThreadDelayInMs) {
super(sampleStream, pausePollerThreadDelayInMs);
super(sampleStream.map(new Func1<HystrixConfiguration, String>() {
@Override
public String call(HystrixConfiguration hystrixConfiguration) {
try {
return HystrixConfigurationJsonStream.convertToString(hystrixConfiguration);
} catch (IOException ioe) {
logger.error("IOException creating JSON from HystrixUtilization", ioe);
return "<IOException> : " + ioe.getMessage();
}
}
}), pausePollerThreadDelayInMs);
}

@Override
Expand All @@ -80,10 +94,5 @@ protected int incrementAndGetCurrentConcurrentConnections() {
protected void decrementCurrentConcurrentConnections() {
concurrentConnections.decrementAndGet();
}

@Override
protected String convertToString(HystrixConfiguration config) throws IOException {
return HystrixConfigurationJsonStream.convertToString(config);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,25 @@

/**
*/
public abstract class HystrixSampleSseServlet<SampleData> extends HttpServlet {
protected final Observable<SampleData> sampleStream;
public abstract class HystrixSampleSseServlet extends HttpServlet {
protected final Observable<String> sampleStream;

private static final Logger logger = LoggerFactory.getLogger(HystrixSampleSseServlet.class);

//wake up occasionally and check that poller is still alive. this value controls how often
private static final int DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS = 500;
protected static final int DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS = 500;

private final int pausePollerThreadDelayInMs;

/* Set to true upon shutdown, so it's OK to be shared among all SampleSseServlets */
private static volatile boolean isDestroyed = false;

protected HystrixSampleSseServlet(Observable<SampleData> sampleStream) {
protected HystrixSampleSseServlet(Observable<String> sampleStream) {
this.sampleStream = sampleStream;
this.pausePollerThreadDelayInMs = DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS;
}

protected HystrixSampleSseServlet(Observable<SampleData> sampleStream, int pausePollerThreadDelayInMs) {
protected HystrixSampleSseServlet(Observable<String> sampleStream, int pausePollerThreadDelayInMs) {
this.sampleStream = sampleStream;
this.pausePollerThreadDelayInMs = pausePollerThreadDelayInMs;
}
Expand All @@ -63,10 +63,6 @@ protected HystrixSampleSseServlet(Observable<SampleData> sampleStream, int pause

protected abstract void decrementCurrentConcurrentConnections();

//protected abstract Observable<SampleData> getStream();

protected abstract String convertToString(SampleData sampleData) throws IOException;

/**
* Handle incoming GETs
*/
Expand Down Expand Up @@ -135,7 +131,7 @@ private void handleRequest(HttpServletRequest request, final HttpServletResponse
//since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext
sampleSubscription = sampleStream
.observeOn(Schedulers.io())
.subscribe(new Subscriber<SampleData>() {
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
logger.error("HystrixSampleSseServlet: ({}) received unexpected OnCompleted from sample stream", getClass().getSimpleName());
Expand All @@ -148,26 +144,17 @@ public void onError(Throwable e) {
}

@Override
public void onNext(SampleData sampleData) {
if (sampleData != null) {
String sampleDataAsStr = null;
public void onNext(String sampleDataAsString) {
if (sampleDataAsString != null) {
try {
sampleDataAsStr = convertToString(sampleData);
} catch (IOException ioe) {
//exception while converting String to JSON
logger.error("Error converting configuration to JSON ", ioe);
}
if (sampleDataAsStr != null) {
try {
writer.print("data: " + sampleDataAsStr + "\n\n");
// explicitly check for client disconnect - PrintWriter does not throw exceptions
if (writer.checkError()) {
throw new IOException("io error");
}
writer.flush();
} catch (IOException ioe) {
moreDataWillBeSent.set(false);
writer.print("data: " + sampleDataAsString + "\n\n");
// explicitly check for client disconnect - PrintWriter does not throw exceptions
if (writer.checkError()) {
throw new IOException("io error");
}
writer.flush();
} catch (IOException ioe) {
moreDataWillBeSent.set(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.hystrix.metric.sample.HystrixUtilization;
import com.netflix.hystrix.metric.sample.HystrixUtilizationStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -45,21 +48,32 @@
* </servlet-mapping>
* } </pre>
*/
public class HystrixUtilizationSseServlet extends HystrixSampleSseServlet<HystrixUtilization> {
public class HystrixUtilizationSseServlet extends HystrixSampleSseServlet {

private static final long serialVersionUID = -7812908330777694972L;
private static final Logger logger = LoggerFactory.getLogger(HystrixUtilizationSseServlet.class);

/* used to track number of connections and throttle */
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections =
DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

public HystrixUtilizationSseServlet() {
super(HystrixUtilizationStream.getInstance().observe());
this(HystrixUtilizationStream.getInstance().observe(), DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS);
}

/* package-private */ HystrixUtilizationSseServlet(Observable<HystrixUtilization> sampleStream, int pausePollerThreadDelayInMs) {
super(sampleStream, pausePollerThreadDelayInMs);
super(sampleStream.map(new Func1<HystrixUtilization, String>() {
@Override
public String call(HystrixUtilization hystrixUtilization) {
try {
return HystrixUtilizationJsonStream.convertToJson(hystrixUtilization);
} catch (IOException ioe) {
logger.error("IOException creating JSON from HystrixUtilization", ioe);
return "<IOException> : " + ioe.getMessage();
}
}
}), pausePollerThreadDelayInMs);
}

@Override
Expand All @@ -81,10 +95,5 @@ protected int incrementAndGetCurrentConcurrentConnections() {
protected void decrementCurrentConcurrentConnections() {
concurrentConnections.decrementAndGet();
}

@Override
protected String convertToString(HystrixUtilization utilization) throws IOException {
return HystrixUtilizationJsonStream.convertToJson(utilization);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import com.netflix.hystrix.metric.consumer.HystrixDashboardStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func0;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;

public class SerialHystrixDashboardData extends SerialHystrixMetric {

Expand Down Expand Up @@ -67,6 +70,24 @@ public static String toJsonString(HystrixDashboardStream.DashboardData dashboard
return jsonString.getBuffer().toString();
}

public static List<String> toMultipleJsonStrings(HystrixDashboardStream.DashboardData dashboardData) {
List<String> jsonStrings = new ArrayList<String>();

for (HystrixCommandMetrics commandMetrics : dashboardData.getCommandMetrics()) {
jsonStrings.add(getCommandMetrics(commandMetrics));
}

for (HystrixThreadPoolMetrics threadPoolMetrics : dashboardData.getThreadPoolMetrics()) {
jsonStrings.add(getThreadPoolMetrics(threadPoolMetrics));
}

for (HystrixCollapserMetrics collapserMetrics : dashboardData.getCollapserMetrics()) {
jsonStrings.add(getCollapserMetrics(collapserMetrics));
}

return jsonStrings;
}

private static void writeDashboardData(JsonGenerator json, HystrixDashboardStream.DashboardData dashboardData) {
try {
json.writeStartArray();
Expand All @@ -91,6 +112,45 @@ private static void writeDashboardData(JsonGenerator json, HystrixDashboardStrea
}
}

private static String getCommandMetrics(HystrixCommandMetrics commandMetrics) {
StringWriter jsonString = new StringWriter();

try {
JsonGenerator json = jsonFactory.createGenerator(jsonString);
writeCommandMetrics(commandMetrics, json);
json.close();
return jsonString.getBuffer().toString();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

private static String getThreadPoolMetrics(HystrixThreadPoolMetrics threadPoolMetrics) {
StringWriter jsonString = new StringWriter();

try {
JsonGenerator json = jsonFactory.createGenerator(jsonString);
writeThreadPoolMetrics(threadPoolMetrics, json);
json.close();
return jsonString.getBuffer().toString();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

private static String getCollapserMetrics(HystrixCollapserMetrics collapserMetrics) {
StringWriter jsonString = new StringWriter();

try {
JsonGenerator json = jsonFactory.createGenerator(jsonString);
writeCollapserMetrics(collapserMetrics, json);
json.close();
return jsonString.getBuffer().toString();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

private static void writeCommandMetrics(final HystrixCommandMetrics commandMetrics, JsonGenerator json) throws IOException {
HystrixCommandKey key = commandMetrics.getCommandKey();
HystrixCircuitBreaker circuitBreaker = HystrixCircuitBreaker.Factory.getInstance(key);
Expand Down

0 comments on commit a2ee1c4

Please sign in to comment.