Skip to content

Commit

Permalink
initial commit ReactiveSocket event streams
Browse files Browse the repository at this point in the history
  • Loading branch information
robertroeser committed May 18, 2016
1 parent b63fdf6 commit f6e1eb8
Show file tree
Hide file tree
Showing 9 changed files with 732 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.netflix.hystrix.contrib.reactivesocket;


public enum EventStreamEnum {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.netflix.hystrix.contrib.reactivesocket;

/**
* Created by rroeser on 5/17/16.
*/
public class EventStreamRequestHandler {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.netflix.hystrix.contrib.reactivesocket;

/**
* Created by rroeser on 5/18/16.
*/
public class StreamingSupplier {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.netflix.hystrix.contrib.reactivesocket.metrics;


import com.fasterxml.jackson.core.JsonGenerator;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCollapserMetrics;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier;
import org.agrona.LangUtil;
import rx.functions.Func0;

import java.io.ByteArrayOutputStream;
import java.util.stream.Stream;

public class HystrixCollasperMetricsStream extends StreamingSupplier<HystrixCollapserMetrics> {
private static HystrixCollasperMetricsStream INSTANCE = new HystrixCollasperMetricsStream();

private HystrixCollasperMetricsStream() {
super();
}

public static HystrixCollasperMetricsStream getInstance() {
return INSTANCE;
}

@Override
protected Stream getStream() {
return HystrixCollapserMetrics.getInstances().stream();
}

protected byte[] getPayloadData(final HystrixCollapserMetrics collapserMetrics) {
byte[] retVal = null;
try {
HystrixCollapserKey key = collapserMetrics.getCollapserKey();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
JsonGenerator json = jsonFactory.createGenerator(bos);
json.writeStartObject();

json.writeStringField("type", "HystrixCollapser");
json.writeStringField("name", key.name());
json.writeNumberField("currentTime", System.currentTimeMillis());

safelyWriteNumberField(json, "rollingCountRequestsBatched", new Func0<Long>() {
@Override
public Long call() {
return collapserMetrics.getRollingCount(HystrixEventType.Collapser.ADDED_TO_BATCH);
}
});
safelyWriteNumberField(json, "rollingCountBatches", new Func0<Long>() {
@Override
public Long call() {
return collapserMetrics.getRollingCount(HystrixEventType.Collapser.BATCH_EXECUTED);
}
});
safelyWriteNumberField(json, "rollingCountResponsesFromCache", new Func0<Long>() {
@Override
public Long call() {
return collapserMetrics.getRollingCount(HystrixEventType.Collapser.RESPONSE_FROM_CACHE);
}
});

// batch size percentiles
json.writeNumberField("batchSize_mean", collapserMetrics.getBatchSizeMean());
json.writeObjectFieldStart("batchSize");
json.writeNumberField("25", collapserMetrics.getBatchSizePercentile(25));
json.writeNumberField("50", collapserMetrics.getBatchSizePercentile(50));
json.writeNumberField("75", collapserMetrics.getBatchSizePercentile(75));
json.writeNumberField("90", collapserMetrics.getBatchSizePercentile(90));
json.writeNumberField("95", collapserMetrics.getBatchSizePercentile(95));
json.writeNumberField("99", collapserMetrics.getBatchSizePercentile(99));
json.writeNumberField("99.5", collapserMetrics.getBatchSizePercentile(99.5));
json.writeNumberField("100", collapserMetrics.getBatchSizePercentile(100));
json.writeEndObject();

// shard size percentiles (commented-out for now)
//json.writeNumberField("shardSize_mean", collapserMetrics.getShardSizeMean());
//json.writeObjectFieldStart("shardSize");
//json.writeNumberField("25", collapserMetrics.getShardSizePercentile(25));
//json.writeNumberField("50", collapserMetrics.getShardSizePercentile(50));
//json.writeNumberField("75", collapserMetrics.getShardSizePercentile(75));
//json.writeNumberField("90", collapserMetrics.getShardSizePercentile(90));
//json.writeNumberField("95", collapserMetrics.getShardSizePercentile(95));
//json.writeNumberField("99", collapserMetrics.getShardSizePercentile(99));
//json.writeNumberField("99.5", collapserMetrics.getShardSizePercentile(99.5));
//json.writeNumberField("100", collapserMetrics.getShardSizePercentile(100));
//json.writeEndObject();

//json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", collapserMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get());
json.writeBooleanField("propertyValue_requestCacheEnabled", collapserMetrics.getProperties().requestCacheEnabled().get());
json.writeNumberField("propertyValue_maxRequestsInBatch", collapserMetrics.getProperties().maxRequestsInBatch().get());
json.writeNumberField("propertyValue_timerDelayInMilliseconds", collapserMetrics.getProperties().timerDelayInMilliseconds().get());

json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster

json.writeEndObject();
json.close();

retVal = bos.toByteArray();
} catch (Exception e) {
LangUtil.rethrowUnchecked(e);
}

return retVal;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package com.netflix.hystrix.contrib.reactivesocket.metrics;

import com.fasterxml.jackson.core.JsonGenerator;
import com.netflix.hystrix.HystrixCircuitBreaker;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandMetrics;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier;
import org.agrona.LangUtil;
import rx.functions.Func0;

import java.io.ByteArrayOutputStream;
import java.util.stream.Stream;

public class HystrixCommandMetricsStream extends StreamingSupplier<HystrixCommandMetrics> {
private static final HystrixCommandMetricsStream INSTANCE = new HystrixCommandMetricsStream();

private HystrixCommandMetricsStream() {
super();
}

public static HystrixCommandMetricsStream getInstance() {
return INSTANCE;
}

@Override
protected Stream<HystrixCommandMetrics> getStream() {
return HystrixCommandMetrics.getInstances().stream();
}

protected byte[] getPayloadData(final HystrixCommandMetrics commandMetrics) {
byte[] retVal = null;

try

{
HystrixCommandKey key = commandMetrics.getCommandKey();
HystrixCircuitBreaker circuitBreaker = HystrixCircuitBreaker.Factory.getInstance(key);

ByteArrayOutputStream bos = new ByteArrayOutputStream();

JsonGenerator json = jsonFactory.createGenerator(bos);

json.writeStartObject();
json.writeStringField("type", "HystrixCommand");
json.writeStringField("name", key.name());
json.writeStringField("group", commandMetrics.getCommandGroup().name());
json.writeNumberField("currentTime", System.currentTimeMillis());

// circuit breaker
if (circuitBreaker == null) {
// circuit breaker is disabled and thus never open
json.writeBooleanField("isCircuitBreakerOpen", false);
} else {
json.writeBooleanField("isCircuitBreakerOpen", circuitBreaker.isOpen());
}
HystrixCommandMetrics.HealthCounts healthCounts = commandMetrics.getHealthCounts();
json.writeNumberField("errorPercentage", healthCounts.getErrorPercentage());
json.writeNumberField("errorCount", healthCounts.getErrorCount());
json.writeNumberField("requestCount", healthCounts.getTotalRequests());

// rolling counters
safelyWriteNumberField(json, "rollingCountBadRequests", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.BAD_REQUEST);
}
});
safelyWriteNumberField(json, "rollingCountCollapsedRequests", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.COLLAPSED);
}
});
safelyWriteNumberField(json, "rollingCountEmit", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.EMIT);
}
});
safelyWriteNumberField(json, "rollingCountExceptionsThrown", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.EXCEPTION_THROWN);
}
});
safelyWriteNumberField(json, "rollingCountFailure", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.FAILURE);
}
});
safelyWriteNumberField(json, "rollingCountFallbackEmit", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_EMIT);
}
});
safelyWriteNumberField(json, "rollingCountFallbackFailure", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_FAILURE);
}
});
safelyWriteNumberField(json, "rollingCountFallbackMissing", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_MISSING);
}
});
safelyWriteNumberField(json, "rollingCountFallbackRejection", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_REJECTION);
}
});
safelyWriteNumberField(json, "rollingCountFallbackSuccess", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.FALLBACK_SUCCESS);
}
});
safelyWriteNumberField(json, "rollingCountResponsesFromCache", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.RESPONSE_FROM_CACHE);
}
});
safelyWriteNumberField(json, "rollingCountSemaphoreRejected", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.SEMAPHORE_REJECTED);
}
});
safelyWriteNumberField(json, "rollingCountShortCircuited", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.SHORT_CIRCUITED);
}
});
safelyWriteNumberField(json, "rollingCountSuccess", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.SUCCESS);
}
});
safelyWriteNumberField(json, "rollingCountThreadPoolRejected", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.THREAD_POOL_REJECTED);
}
});
safelyWriteNumberField(json, "rollingCountTimeout", new Func0<Long>() {
@Override
public Long call() {
return commandMetrics.getRollingCount(HystrixEventType.TIMEOUT);
}
});

json.writeNumberField("currentConcurrentExecutionCount", commandMetrics.getCurrentConcurrentExecutionCount());
json.writeNumberField("rollingMaxConcurrentExecutionCount", commandMetrics.getRollingMaxConcurrentExecutions());

// latency percentiles
json.writeNumberField("latencyExecute_mean", commandMetrics.getExecutionTimeMean());
json.writeObjectFieldStart("latencyExecute");
json.writeNumberField("0", commandMetrics.getExecutionTimePercentile(0));
json.writeNumberField("25", commandMetrics.getExecutionTimePercentile(25));
json.writeNumberField("50", commandMetrics.getExecutionTimePercentile(50));
json.writeNumberField("75", commandMetrics.getExecutionTimePercentile(75));
json.writeNumberField("90", commandMetrics.getExecutionTimePercentile(90));
json.writeNumberField("95", commandMetrics.getExecutionTimePercentile(95));
json.writeNumberField("99", commandMetrics.getExecutionTimePercentile(99));
json.writeNumberField("99.5", commandMetrics.getExecutionTimePercentile(99.5));
json.writeNumberField("100", commandMetrics.getExecutionTimePercentile(100));
json.writeEndObject();
//
json.writeNumberField("latencyTotal_mean", commandMetrics.getTotalTimeMean());
json.writeObjectFieldStart("latencyTotal");
json.writeNumberField("0", commandMetrics.getTotalTimePercentile(0));
json.writeNumberField("25", commandMetrics.getTotalTimePercentile(25));
json.writeNumberField("50", commandMetrics.getTotalTimePercentile(50));
json.writeNumberField("75", commandMetrics.getTotalTimePercentile(75));
json.writeNumberField("90", commandMetrics.getTotalTimePercentile(90));
json.writeNumberField("95", commandMetrics.getTotalTimePercentile(95));
json.writeNumberField("99", commandMetrics.getTotalTimePercentile(99));
json.writeNumberField("99.5", commandMetrics.getTotalTimePercentile(99.5));
json.writeNumberField("100", commandMetrics.getTotalTimePercentile(100));
json.writeEndObject();

// property values for reporting what is actually seen by the command rather than what was set somewhere
HystrixCommandProperties commandProperties = commandMetrics.getProperties();

json.writeNumberField("propertyValue_circuitBreakerRequestVolumeThreshold", commandProperties.circuitBreakerRequestVolumeThreshold().get());
json.writeNumberField("propertyValue_circuitBreakerSleepWindowInMilliseconds", commandProperties.circuitBreakerSleepWindowInMilliseconds().get());
json.writeNumberField("propertyValue_circuitBreakerErrorThresholdPercentage", commandProperties.circuitBreakerErrorThresholdPercentage().get());
json.writeBooleanField("propertyValue_circuitBreakerForceOpen", commandProperties.circuitBreakerForceOpen().get());
json.writeBooleanField("propertyValue_circuitBreakerForceClosed", commandProperties.circuitBreakerForceClosed().get());
json.writeBooleanField("propertyValue_circuitBreakerEnabled", commandProperties.circuitBreakerEnabled().get());

json.writeStringField("propertyValue_executionIsolationStrategy", commandProperties.executionIsolationStrategy().get().name());
json.writeNumberField("propertyValue_executionIsolationThreadTimeoutInMilliseconds", commandProperties.executionTimeoutInMilliseconds().get());
json.writeNumberField("propertyValue_executionTimeoutInMilliseconds", commandProperties.executionTimeoutInMilliseconds().get());
json.writeBooleanField("propertyValue_executionIsolationThreadInterruptOnTimeout", commandProperties.executionIsolationThreadInterruptOnTimeout().get());
json.writeStringField("propertyValue_executionIsolationThreadPoolKeyOverride", commandProperties.executionIsolationThreadPoolKeyOverride().get());
json.writeNumberField("propertyValue_executionIsolationSemaphoreMaxConcurrentRequests", commandProperties.executionIsolationSemaphoreMaxConcurrentRequests().get());
json.writeNumberField("propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests", commandProperties.fallbackIsolationSemaphoreMaxConcurrentRequests().get());

/*
* The following are commented out as these rarely change and are verbose for streaming for something people don't change.
* We could perhaps allow a property or request argument to include these.
*/

// json.put("propertyValue_metricsRollingPercentileEnabled", commandProperties.metricsRollingPercentileEnabled().get());
// json.put("propertyValue_metricsRollingPercentileBucketSize", commandProperties.metricsRollingPercentileBucketSize().get());
// json.put("propertyValue_metricsRollingPercentileWindow", commandProperties.metricsRollingPercentileWindowInMilliseconds().get());
// json.put("propertyValue_metricsRollingPercentileWindowBuckets", commandProperties.metricsRollingPercentileWindowBuckets().get());
// json.put("propertyValue_metricsRollingStatisticalWindowBuckets", commandProperties.metricsRollingStatisticalWindowBuckets().get());
json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", commandProperties.metricsRollingStatisticalWindowInMilliseconds().get());

json.writeBooleanField("propertyValue_requestCacheEnabled", commandProperties.requestCacheEnabled().get());
json.writeBooleanField("propertyValue_requestLogEnabled", commandProperties.requestLogEnabled().get());

json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster
json.writeStringField("threadPool", commandMetrics.getThreadPoolKey().name());

json.writeEndObject();
json.close();

retVal = bos.toByteArray();
} catch (Exception t) {
LangUtil.rethrowUnchecked(t);
}

return retVal;
}
}


Loading

0 comments on commit f6e1eb8

Please sign in to comment.