diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsSseServlet.java new file mode 100644 index 000000000..c1b606bc4 --- /dev/null +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsSseServlet.java @@ -0,0 +1,274 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.requests.stream; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.config.DynamicIntProperty; +import com.netflix.config.DynamicPropertyFactory; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.HystrixInvokableInfo; +import com.netflix.hystrix.metric.HystrixRequestEvents; +import com.netflix.hystrix.metric.HystrixRequestEventsStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Subscriber; +import rx.Subscription; +import rx.schedulers.Schedulers; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ +public class HystrixRequestEventsSseServlet extends HttpServlet { + + private static final Logger logger = LoggerFactory.getLogger(HystrixRequestEventsSseServlet.class); + + private static volatile boolean isDestroyed = false; + + private static final String DELAY_REQ_PARAM_NAME = "delay"; + private static final int DEFAULT_DELAY_IN_MILLISECONDS = 10000; + private static final int DEFAULT_QUEUE_DEPTH = 1000; + private static final String PING = "\n: ping\n"; + + /* used to track number of connections and throttle */ + private static AtomicInteger concurrentConnections = new AtomicInteger(0); + private static DynamicIntProperty maxConcurrentConnections = + DynamicPropertyFactory.getInstance().getIntProperty("hystrix.requests.stream.maxConcurrentConnections", 5); + + private final LinkedBlockingQueue requestQueue = new LinkedBlockingQueue(DEFAULT_QUEUE_DEPTH); + private final JsonFactory jsonFactory = new JsonFactory(); + + /** + * Handle incoming GETs + */ + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + if (isDestroyed) { + response.sendError(503, "Service has been shut down."); + } else { + handleRequest(request, response); + } + } + + /* package-private */ + int getDelayFromHttpRequest(HttpServletRequest req) { + try { + String delay = req.getParameter(DELAY_REQ_PARAM_NAME); + if (delay != null) { + return Math.max(Integer.parseInt(delay), 1); + } + } catch (Throwable ex) { + //silently fail + } + return DEFAULT_DELAY_IN_MILLISECONDS; + } + + /** + * WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing + * a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at + * shutdown, perhaps from some other servlet's destroy() method. + */ + public static void shutdown() { + isDestroyed = true; + } + + @Override + public void init() throws ServletException { + isDestroyed = false; + } + + /** + * Handle servlet being undeployed by gracefully releasing connections so poller threads stop. + */ + @Override + public void destroy() { + /* set marker so the loops can break out */ + isDestroyed = true; + super.destroy(); + } + + private String convertToString(Collection requests) throws IOException { + StringWriter jsonString = new StringWriter(); + JsonGenerator json = jsonFactory.createGenerator(jsonString); + + json.writeStartArray(); + for (HystrixRequestEvents request : requests) { + convertRequestToJson(json, request); + } + json.writeEndArray(); + json.close(); + return jsonString.getBuffer().toString(); + } + + private void convertRequestToJson(JsonGenerator json, HystrixRequestEvents request) throws IOException { + json.writeStartObject(); + json.writeStringField("request", request.getRequestContext().toString()); + json.writeObjectFieldStart("commands"); + for (HystrixInvokableInfo execution: request.getExecutions()) { + convertExecutionToJson(json, execution); + } + json.writeEndObject(); + json.writeEndObject(); + } + + private void convertExecutionToJson(JsonGenerator json, HystrixInvokableInfo execution) throws IOException { + json.writeObjectFieldStart(execution.getCommandKey().name()); + json.writeNumberField("latency", execution.getExecutionTimeInMilliseconds()); + json.writeArrayFieldStart("events"); + for (HystrixEventType eventType: execution.getExecutionEvents()) { + switch (eventType) { + case EMIT: + json.writeStartObject(); + json.writeNumberField(eventType.name(), execution.getNumberEmissions()); + json.writeEndObject(); + break; + case FALLBACK_EMIT: + json.writeStartObject(); + json.writeNumberField(eventType.name(), execution.getNumberFallbackEmissions()); + json.writeEndObject(); + break; + case COLLAPSED: + json.writeStartObject(); + json.writeNumberField(eventType.name(), execution.getNumberCollapsed()); + json.writeEndObject(); + break; + default: + json.writeString(eventType.name()); + break; + } + } + json.writeEndArray(); + json.writeEndObject(); + } + + /** + * - maintain an open connection with the client + * - on initial connection send latest data of each requested event type + * - subsequently send all changes for each requested event type + * + * @param request incoming HTTP Request + * @param response outgoing HTTP Response (as a streaming response) + * @throws javax.servlet.ServletException + * @throws java.io.IOException + */ + private void handleRequest(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { + final AtomicBoolean moreDataWillBeSent = new AtomicBoolean(true); + Subscription requestsSubscription = null; + + /* ensure we aren't allowing more connections than we want */ + int numberConnections = concurrentConnections.incrementAndGet(); + try { + int maxNumberConnectionsAllowed = maxConcurrentConnections.get(); + if (numberConnections > maxNumberConnectionsAllowed) { + response.sendError(503, "MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed); + } else { + int delay = getDelayFromHttpRequest(request); + + /* initialize response */ + response.setHeader("Content-Type", "text/event-stream;charset=UTF-8"); + response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate"); + response.setHeader("Pragma", "no-cache"); + + final PrintWriter writer = response.getWriter(); + + //since the sample stream is based on Observable.interval, events will get published on an RxComputation thread + //since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext + requestsSubscription = HystrixRequestEventsStream.getInstance() + .observe() + .observeOn(Schedulers.io()) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + logger.error("HystrixRequestEventsSseServlet received unexpected OnCompleted from request stream"); + moreDataWillBeSent.set(false); + } + + @Override + public void onError(Throwable e) { + moreDataWillBeSent.set(false); + } + + @Override + public void onNext(HystrixRequestEvents requestEvents) { + if (requestEvents != null) { + requestQueue.offer(requestEvents); + } + } + }); + + while (moreDataWillBeSent.get() && !isDestroyed) { + try { + if (requestQueue.isEmpty()) { + try { + writer.print(PING); + writer.flush(); + } catch (Throwable t) { + throw new IOException("Exception while writing ping"); + } + + if (writer.checkError()) { + throw new IOException("io error"); + } + } else { + List l = new ArrayList(); + requestQueue.drainTo(l); + String requestEventsAsStr = convertToString(l); + //try { + //} catch (IOException ioe) { + // //exception while converting String to JSON + // logger.error("Error converting configuration to JSON ", ioe); + //} + if (requestEventsAsStr != null) { + try { + writer.print("data: " + requestEventsAsStr + "\n\n"); + // explicitly check for client disconnect - PrintWriter does not throw exceptions + if (writer.checkError()) { + throw new IOException("io error"); + } + writer.flush(); + } catch (IOException ioe) { + moreDataWillBeSent.set(false); + } + } + } + Thread.sleep(delay); + } catch (InterruptedException e) { + moreDataWillBeSent.set(false); + } + } + } + } finally { + concurrentConnections.decrementAndGet(); + if (requestsSubscription != null && !requestsSubscription.isUnsubscribed()) { + requestsSubscription.unsubscribe(); + } + } + } +} + diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java new file mode 100644 index 000000000..a47be40cf --- /dev/null +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServlet.java @@ -0,0 +1,211 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.sample.stream; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.config.DynamicIntProperty; +import com.netflix.config.DynamicPropertyFactory; +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.config.HystrixCollapserConfiguration; +import com.netflix.hystrix.config.HystrixCommandConfiguration; +import com.netflix.hystrix.config.HystrixConfiguration; +import com.netflix.hystrix.config.HystrixConfigurationStream; +import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; +import rx.Observable; +import rx.functions.Func1; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Streams Hystrix config in text/event-stream format. + *

+ * Install by: + *

+ * 1) Including hystrix-metrics-event-stream-*.jar in your classpath. + *

+ * 2) Adding the following to web.xml: + *

{@code
+ * 
+ *  
+ *  HystrixConfigSseServlet
+ *  HystrixConfigSseServlet
+ *  com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet
+ * 
+ * 
+ *  HystrixConfigSseServlet
+ *  /hystrix/config.stream
+ * 
+ * } 
+ */ +public class HystrixConfigSseServlet extends HystrixSampleSseServlet { + + private static final long serialVersionUID = -3599771169762858235L; + + private static final int DEFAULT_ONNEXT_DELAY_IN_MS = 10000; + + private JsonFactory jsonFactory = new JsonFactory(); + + /* used to track number of connections and throttle */ + private static AtomicInteger concurrentConnections = new AtomicInteger(0); + private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5); + + public HystrixConfigSseServlet() { + super(new Func1>() { + @Override + public Observable call(Integer delay) { + return new HystrixConfigurationStream(delay).observe(); + } + }); + } + + /* package-private */ HystrixConfigSseServlet(Func1> createStream) { + super(createStream); + } + + @Override + int getDefaultDelayInMilliseconds() { + return DEFAULT_ONNEXT_DELAY_IN_MS; + } + + @Override + int getMaxNumberConcurrentConnectionsAllowed() { + return maxConcurrentConnections.get(); + } + + @Override + int getNumberCurrentConnections() { + return concurrentConnections.get(); + } + + @Override + protected int incrementAndGetCurrentConcurrentConnections() { + return concurrentConnections.incrementAndGet(); + } + + @Override + protected void decrementCurrentConcurrentConnections() { + concurrentConnections.decrementAndGet(); + } + + private void writeCommandConfigJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandConfiguration commandConfig) throws IOException { + json.writeObjectFieldStart(key.name()); + json.writeStringField("threadPoolKey", commandConfig.getThreadPoolKey().name()); + json.writeStringField("groupKey", commandConfig.getGroupKey().name()); + json.writeObjectFieldStart("execution"); + HystrixCommandConfiguration.HystrixCommandExecutionConfig executionConfig = commandConfig.getExecutionConfig(); + json.writeStringField("isolationStrategy", executionConfig.getIsolationStrategy().name()); + json.writeStringField("threadPoolKeyOverride", executionConfig.getThreadPoolKeyOverride()); + json.writeBooleanField("requestCacheEnabled", executionConfig.isRequestCacheEnabled()); + json.writeBooleanField("requestLogEnabled", executionConfig.isRequestLogEnabled()); + json.writeBooleanField("timeoutEnabled", executionConfig.isTimeoutEnabled()); + json.writeBooleanField("fallbackEnabled", executionConfig.isFallbackEnabled()); + json.writeNumberField("timeoutInMilliseconds", executionConfig.getTimeoutInMilliseconds()); + json.writeNumberField("semaphoreSize", executionConfig.getSemaphoreMaxConcurrentRequests()); + json.writeNumberField("fallbackSemaphoreSize", executionConfig.getFallbackMaxConcurrentRequest()); + json.writeBooleanField("threadInterruptOnTimeout", executionConfig.isThreadInterruptOnTimeout()); + json.writeEndObject(); + json.writeObjectFieldStart("metrics"); + HystrixCommandConfiguration.HystrixCommandMetricsConfig metricsConfig = commandConfig.getMetricsConfig(); + json.writeNumberField("healthBucketSizeInMs", metricsConfig.getHealthIntervalInMilliseconds()); + json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); + json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); + json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + json.writeObjectFieldStart("circuitBreaker"); + HystrixCommandConfiguration.HystrixCommandCircuitBreakerConfig circuitBreakerConfig = commandConfig.getCircuitBreakerConfig(); + json.writeBooleanField("enabled", circuitBreakerConfig.isEnabled()); + json.writeBooleanField("isForcedOpen", circuitBreakerConfig.isForceOpen()); + json.writeBooleanField("isForcedClosed", circuitBreakerConfig.isForceOpen()); + json.writeNumberField("requestVolumeThreshold", circuitBreakerConfig.getRequestVolumeThreshold()); + json.writeNumberField("errorPercentageThreshold", circuitBreakerConfig.getErrorThresholdPercentage()); + json.writeNumberField("sleepInMilliseconds", circuitBreakerConfig.getSleepWindowInMilliseconds()); + json.writeEndObject(); + json.writeEndObject(); + } + + private void writeThreadPoolConfigJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolConfiguration threadPoolConfig) throws IOException { + json.writeObjectFieldStart(threadPoolKey.name()); + json.writeNumberField("coreSize", threadPoolConfig.getCoreSize()); + json.writeNumberField("maxQueueSize", threadPoolConfig.getMaxQueueSize()); + json.writeNumberField("queueRejectionThreshold", threadPoolConfig.getQueueRejectionThreshold()); + json.writeNumberField("keepAliveTimeInMinutes", threadPoolConfig.getKeepAliveTimeInMinutes()); + json.writeNumberField("counterBucketSizeInMilliseconds", threadPoolConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", threadPoolConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + } + + private void writeCollapserConfigJson(JsonGenerator json, HystrixCollapserKey collapserKey, HystrixCollapserConfiguration collapserConfig) throws IOException { + json.writeObjectFieldStart(collapserKey.name()); + json.writeNumberField("maxRequestsInBatch", collapserConfig.getMaxRequestsInBatch()); + json.writeNumberField("timerDelayInMilliseconds", collapserConfig.getTimerDelayInMilliseconds()); + json.writeBooleanField("requestCacheEnabled", collapserConfig.isRequestCacheEnabled()); + json.writeObjectFieldStart("metrics"); + HystrixCollapserConfiguration.CollapserMetricsConfig metricsConfig = collapserConfig.getCollapserMetricsConfig(); + json.writeNumberField("percentileBucketSizeInMilliseconds", metricsConfig.getRollingPercentileBucketSizeInMilliseconds()); + json.writeNumberField("percentileBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeBooleanField("percentileEnabled", metricsConfig.isRollingPercentileEnabled()); + json.writeNumberField("counterBucketSizeInMilliseconds", metricsConfig.getRollingCounterBucketSizeInMilliseconds()); + json.writeNumberField("counterBucketCount", metricsConfig.getRollingCounterNumberOfBuckets()); + json.writeEndObject(); + json.writeEndObject(); + } + + @Override + protected String convertToString(HystrixConfiguration config) throws IOException { + StringWriter jsonString = new StringWriter(); + JsonGenerator json = jsonFactory.createGenerator(jsonString); + + json.writeStartObject(); + json.writeStringField("type", "HystrixConfig"); + json.writeObjectFieldStart("commands"); + for (Map.Entry entry: config.getCommandConfig().entrySet()) { + final HystrixCommandKey key = entry.getKey(); + final HystrixCommandConfiguration commandConfig = entry.getValue(); + writeCommandConfigJson(json, key, commandConfig); + + } + json.writeEndObject(); + + json.writeObjectFieldStart("threadpools"); + for (Map.Entry entry: config.getThreadPoolConfig().entrySet()) { + final HystrixThreadPoolKey threadPoolKey = entry.getKey(); + final HystrixThreadPoolConfiguration threadPoolConfig = entry.getValue(); + writeThreadPoolConfigJson(json, threadPoolKey, threadPoolConfig); + } + json.writeEndObject(); + + json.writeObjectFieldStart("collapsers"); + for (Map.Entry entry: config.getCollapserConfig().entrySet()) { + final HystrixCollapserKey collapserKey = entry.getKey(); + final HystrixCollapserConfiguration collapserConfig = entry.getValue(); + writeCollapserConfigJson(json, collapserKey, collapserConfig); + } + json.writeEndObject(); + json.writeEndObject(); + json.close(); + + return jsonString.getBuffer().toString(); + } +} + diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java new file mode 100644 index 000000000..1d39bea80 --- /dev/null +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java @@ -0,0 +1,202 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.sample.stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; +import rx.Subscriber; +import rx.Subscription; +import rx.functions.Func1; +import rx.schedulers.Schedulers; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + */ +public abstract class HystrixSampleSseServlet extends HttpServlet { + + private static final Logger logger = LoggerFactory.getLogger(HystrixSampleSseServlet.class); + + /* Set to true upon shutdown, so it's OK to be shared among all SampleSseServlets */ + private static volatile boolean isDestroyed = false; + + private static final String DELAY_REQ_PARAM_NAME = "delay"; + + private final Func1> createStream; + + protected HystrixSampleSseServlet(Func1> createStream) { + this.createStream = createStream; + } + + abstract int getDefaultDelayInMilliseconds(); + + abstract int getMaxNumberConcurrentConnectionsAllowed(); + + abstract int getNumberCurrentConnections(); + + protected abstract int incrementAndGetCurrentConcurrentConnections(); + + protected abstract void decrementCurrentConcurrentConnections(); + + protected abstract String convertToString(SampleData sampleData) throws IOException; + + /** + * Handle incoming GETs + */ + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + if (isDestroyed) { + response.sendError(503, "Service has been shut down."); + } else { + handleRequest(request, response); + } + } + + /* package-private */ + int getDelayFromHttpRequest(HttpServletRequest req) { + try { + String delay = req.getParameter(DELAY_REQ_PARAM_NAME); + if (delay != null) { + return Math.max(Integer.parseInt(delay), 1); + } + } catch (Throwable ex) { + //silently fail + } + return getDefaultDelayInMilliseconds(); + } + + /** + * WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing + * a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at + * shutdown, perhaps from some other servlet's destroy() method. + */ + public static void shutdown() { + isDestroyed = true; + } + + @Override + public void init() throws ServletException { + isDestroyed = false; + } + + /** + * Handle servlet being undeployed by gracefully releasing connections so poller threads stop. + */ + @Override + public void destroy() { + /* set marker so the loops can break out */ + isDestroyed = true; + super.destroy(); + } + + /** + * - maintain an open connection with the client + * - on initial connection send latest data of each requested event type + * - subsequently send all changes for each requested event type + * + * @param request incoming HTTP Request + * @param response outgoing HTTP Response (as a streaming response) + * @throws javax.servlet.ServletException + * @throws java.io.IOException + */ + private void handleRequest(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { + final AtomicBoolean moreDataWillBeSent = new AtomicBoolean(true); + Subscription sampleSubscription = null; + + /* ensure we aren't allowing more connections than we want */ + int numberConnections = incrementAndGetCurrentConcurrentConnections(); + try { + int maxNumberConnectionsAllowed = getMaxNumberConcurrentConnectionsAllowed(); //may change at runtime, so look this up for each request + if (numberConnections > maxNumberConnectionsAllowed) { + response.sendError(503, "MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed); + } else { + int delay = getDelayFromHttpRequest(request); + + /* initialize response */ + response.setHeader("Content-Type", "text/event-stream;charset=UTF-8"); + response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate"); + response.setHeader("Pragma", "no-cache"); + + final PrintWriter writer = response.getWriter(); + + Observable sampledStream = createStream.call(delay); + + //since the sample stream is based on Observable.interval, events will get published on an RxComputation thread + //since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext + sampleSubscription = sampledStream + .observeOn(Schedulers.io()) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + logger.error("HystrixSampleSseServlet: (" + getClass().getSimpleName() + ") received unexpected OnCompleted from sample stream"); + moreDataWillBeSent.set(false); + } + + @Override + public void onError(Throwable e) { + moreDataWillBeSent.set(false); + } + + @Override + public void onNext(SampleData sampleData) { + if (sampleData != null) { + String sampleDataAsStr = null; + try { + sampleDataAsStr = convertToString(sampleData); + } catch (IOException ioe) { + //exception while converting String to JSON + logger.error("Error converting configuration to JSON ", ioe); + } + if (sampleDataAsStr != null) { + try { + writer.print("data: " + sampleDataAsStr + "\n\n"); + // explicitly check for client disconnect - PrintWriter does not throw exceptions + if (writer.checkError()) { + throw new IOException("io error"); + } + writer.flush(); + } catch (IOException ioe) { + moreDataWillBeSent.set(false); + } + } + } + } + }); + + while (moreDataWillBeSent.get() && !isDestroyed) { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + moreDataWillBeSent.set(false); + } + } + } + } finally { + decrementCurrentConcurrentConnections(); + if (sampleSubscription != null && !sampleSubscription.isUnsubscribed()) { + sampleSubscription.unsubscribe(); + } + } + } +} + diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java new file mode 100644 index 000000000..84e8624da --- /dev/null +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixUtilizationSseServlet.java @@ -0,0 +1,152 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.sample.stream; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.config.DynamicIntProperty; +import com.netflix.config.DynamicPropertyFactory; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.metric.sample.HystrixCommandUtilization; +import com.netflix.hystrix.metric.sample.HystrixThreadPoolUtilization; +import com.netflix.hystrix.metric.sample.HystrixUtilization; +import com.netflix.hystrix.metric.sample.HystrixUtilizationStream; +import rx.Observable; +import rx.functions.Func1; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Streams Hystrix config in text/event-stream format. + *

+ * Install by: + *

+ * 1) Including hystrix-metrics-event-stream-*.jar in your classpath. + *

+ * 2) Adding the following to web.xml: + *

{@code
+ * 
+ *  
+ *  HystrixUtilizationSseServlet
+ *  HystrixUtilizationSseServlet
+ *  com.netflix.hystrix.contrib.sample.stream.HystrixUtilizationSseServlet
+ * 
+ * 
+ *  HystrixUtilizationSseServlet
+ *  /hystrix/utilization.stream
+ * 
+ * } 
+ */ +public class HystrixUtilizationSseServlet extends HystrixSampleSseServlet { + + private static final long serialVersionUID = -7812908330777694972L; + + private static final int DEFAULT_ONNEXT_DELAY_IN_MS = 100; + + private JsonFactory jsonFactory = new JsonFactory(); + + /* used to track number of connections and throttle */ + private static AtomicInteger concurrentConnections = new AtomicInteger(0); + private static DynamicIntProperty maxConcurrentConnections = + DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5); + + public HystrixUtilizationSseServlet() { + super(new Func1>() { + @Override + public Observable call(Integer delay) { + return new HystrixUtilizationStream(delay).observe(); + } + }); + } + + /* package-private */ HystrixUtilizationSseServlet(Func1> createStream) { + super(createStream); + } + + @Override + int getDefaultDelayInMilliseconds() { + return DEFAULT_ONNEXT_DELAY_IN_MS; + } + + @Override + int getMaxNumberConcurrentConnectionsAllowed() { + return maxConcurrentConnections.get(); + } + + @Override + int getNumberCurrentConnections() { + return concurrentConnections.get(); + } + + @Override + protected int incrementAndGetCurrentConcurrentConnections() { + return concurrentConnections.incrementAndGet(); + } + + @Override + protected void decrementCurrentConcurrentConnections() { + concurrentConnections.decrementAndGet(); + } + + private void writeCommandUtilizationJson(JsonGenerator json, HystrixCommandKey key, HystrixCommandUtilization utilization) throws IOException { + json.writeObjectFieldStart(key.name()); + json.writeNumberField("activeCount", utilization.getConcurrentCommandCount()); + json.writeEndObject(); + } + + private void writeThreadPoolUtilizationJson(JsonGenerator json, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolUtilization utilization) throws IOException { + json.writeObjectFieldStart(threadPoolKey.name()); + json.writeNumberField("activeCount", utilization.getCurrentActiveCount()); + json.writeNumberField("queueSize", utilization.getCurrentQueueSize()); + json.writeNumberField("corePoolSize", utilization.getCurrentCorePoolSize()); + json.writeNumberField("poolSize", utilization.getCurrentPoolSize()); + json.writeEndObject(); + } + + @Override + protected String convertToString(HystrixUtilization utilization) throws IOException { + StringWriter jsonString = new StringWriter(); + JsonGenerator json = jsonFactory.createGenerator(jsonString); + + json.writeStartObject(); + json.writeStringField("type", "HystrixUtilization"); + json.writeObjectFieldStart("commands"); + for (Map.Entry entry: utilization.getCommandUtilizationMap().entrySet()) { + final HystrixCommandKey key = entry.getKey(); + final HystrixCommandUtilization commandUtilization = entry.getValue(); + writeCommandUtilizationJson(json, key, commandUtilization); + + } + json.writeEndObject(); + + json.writeObjectFieldStart("threadpools"); + for (Map.Entry entry: utilization.getThreadPoolUtilizationMap().entrySet()) { + final HystrixThreadPoolKey threadPoolKey = entry.getKey(); + final HystrixThreadPoolUtilization threadPoolUtilization = entry.getValue(); + writeThreadPoolUtilizationJson(json, threadPoolKey, threadPoolUtilization); + } + json.writeEndObject(); + json.writeEndObject(); + json.close(); + + return jsonString.getBuffer().toString(); + } +} + diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java b/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java new file mode 100644 index 000000000..25effdc77 --- /dev/null +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/test/java/com/netflix/hystrix/contrib/sample/stream/HystrixConfigSseServletTest.java @@ -0,0 +1,347 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.sample.stream; + +import com.netflix.hystrix.config.HystrixConfiguration; +import com.netflix.hystrix.config.HystrixConfigurationStream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import rx.Observable; +import rx.Subscriber; +import rx.functions.Func1; +import rx.schedulers.Schedulers; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HystrixConfigSseServletTest { + + @Mock HttpServletRequest mockReq; + @Mock HttpServletResponse mockResp; + @Mock HystrixConfiguration mockConfig; + @Mock PrintWriter mockPrintWriter; + + HystrixConfigSseServlet servlet; + + private final Observable streamOfOnNexts = Observable.interval(100, TimeUnit.MILLISECONDS).map(new Func1() { + @Override + public HystrixConfiguration call(Long timestamp) { + return mockConfig; + } + }); + + private Func1> generateStream(final Observable o) { + return new Func1>() { + @Override + public Observable call(Integer integer) { + return o; + } + }; + } + + private final Observable streamOfOnNextThenOnError = Observable.create(new Observable.OnSubscribe() { + @Override + public void call(Subscriber subscriber) { + try { + Thread.sleep(100); + subscriber.onNext(mockConfig); + Thread.sleep(100); + subscriber.onError(new RuntimeException("stream failure")); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + }).subscribeOn(Schedulers.computation()); + + private final Observable streamOfOnNextThenOnCompleted = Observable.create(new Observable.OnSubscribe() { + @Override + public void call(Subscriber subscriber) { + try { + Thread.sleep(100); + subscriber.onNext(mockConfig); + Thread.sleep(100); + subscriber.onCompleted(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + }).subscribeOn(Schedulers.computation()); + + @Before + public void init() { + MockitoAnnotations.initMocks(this); + + } + + @After + public void tearDown() { + servlet.destroy(); + servlet.shutdown(); + } + + @Test + public void shutdownServletShouldRejectRequests() throws ServletException, IOException { + servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts)); + try { + servlet.init(); + } catch (ServletException ex) { + + } + + servlet.shutdown(); + + servlet.doGet(mockReq, mockResp); + + verify(mockResp).sendError(503, "Service has been shut down."); + + } + + @Test + public void testConfigDataWithInfiniteOnNextStream() throws IOException, InterruptedException { + servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts)); + try { + servlet.init(); + } catch (ServletException ex) { + + } + + final AtomicInteger writes = new AtomicInteger(0); + + when(mockReq.getParameter("delay")).thenReturn("100"); + when(mockResp.getWriter()).thenReturn(mockPrintWriter); + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + String written = (String) invocation.getArguments()[0]; + System.out.println("ARG : " + written); + + if (!written.contains("ping")) { + writes.incrementAndGet(); + } + return null; + } + }).when(mockPrintWriter).print(Mockito.anyString()); + + Runnable simulateClient = new Runnable() { + @Override + public void run() { + try { + servlet.doGet(mockReq, mockResp); + } catch (ServletException ex) { + fail(ex.getMessage()); + } catch (IOException ex) { + fail(ex.getMessage()); + } + } + }; + + Thread t = new Thread(simulateClient); + System.out.println("Starting thread : " + t.getName()); + t.start(); + System.out.println("Started thread : " + t.getName()); + + try { + Thread.sleep(1000); + System.out.println("Woke up from sleep : " + Thread.currentThread().getName()); + } catch (InterruptedException ex) { + fail(ex.getMessage()); + } + + System.out.println("About to interrupt"); + t.interrupt(); + System.out.println("Done interrupting"); + + Thread.sleep(100); + + System.out.println("WRITES : " + writes.get()); + assertEquals(9, writes.get()); + assertEquals(0, servlet.getNumberCurrentConnections()); + } + + @Test + public void testConfigDataWithStreamOnError() throws IOException, InterruptedException { + servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNextThenOnError)); + try { + servlet.init(); + } catch (ServletException ex) { + + } + + final AtomicInteger writes = new AtomicInteger(0); + + when(mockReq.getParameter("delay")).thenReturn("100"); + when(mockResp.getWriter()).thenReturn(mockPrintWriter); + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + String written = (String) invocation.getArguments()[0]; + System.out.println("ARG : " + written); + + if (!written.contains("ping")) { + writes.incrementAndGet(); + } + return null; + } + }).when(mockPrintWriter).print(Mockito.anyString()); + + Runnable simulateClient = new Runnable() { + @Override + public void run() { + try { + servlet.doGet(mockReq, mockResp); + } catch (ServletException ex) { + fail(ex.getMessage()); + } catch (IOException ex) { + fail(ex.getMessage()); + } + } + }; + + Thread t = new Thread(simulateClient); + t.start(); + + try { + Thread.sleep(1000); + System.out.println(System.currentTimeMillis() + " Woke up from sleep : " + Thread.currentThread().getName()); + } catch (InterruptedException ex) { + fail(ex.getMessage()); + } + + assertEquals(1, writes.get()); + assertEquals(0, servlet.getNumberCurrentConnections()); + } + + @Test + public void testConfigDataWithStreamOnCompleted() throws IOException, InterruptedException { + servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNextThenOnCompleted)); + try { + servlet.init(); + } catch (ServletException ex) { + + } + + final AtomicInteger writes = new AtomicInteger(0); + + when(mockReq.getParameter("delay")).thenReturn("100"); + when(mockResp.getWriter()).thenReturn(mockPrintWriter); + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + String written = (String) invocation.getArguments()[0]; + System.out.println("ARG : " + written); + + if (!written.contains("ping")) { + writes.incrementAndGet(); + } + return null; + } + }).when(mockPrintWriter).print(Mockito.anyString()); + + Runnable simulateClient = new Runnable() { + @Override + public void run() { + try { + servlet.doGet(mockReq, mockResp); + } catch (ServletException ex) { + fail(ex.getMessage()); + } catch (IOException ex) { + fail(ex.getMessage()); + } + } + }; + + Thread t = new Thread(simulateClient); + t.start(); + + try { + Thread.sleep(1000); + System.out.println(System.currentTimeMillis() + " Woke up from sleep : " + Thread.currentThread().getName()); + } catch (InterruptedException ex) { + fail(ex.getMessage()); + } + + assertEquals(1, writes.get()); + assertEquals(0, servlet.getNumberCurrentConnections()); + } + + @Test + public void testConfigDataWithIoExceptionOnWrite() throws IOException, InterruptedException { + servlet = new HystrixConfigSseServlet(generateStream(streamOfOnNexts)); + try { + servlet.init(); + } catch (ServletException ex) { + + } + + final AtomicInteger writes = new AtomicInteger(0); + + when(mockReq.getParameter("delay")).thenReturn("100"); + when(mockResp.getWriter()).thenReturn(mockPrintWriter); + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + String written = (String) invocation.getArguments()[0]; + System.out.println("ARG : " + written); + + if (!written.contains("ping")) { + writes.incrementAndGet(); + } + throw new IOException("simulated IO Exception"); + } + }).when(mockPrintWriter).print(Mockito.anyString()); + + Runnable simulateClient = new Runnable() { + @Override + public void run() { + try { + servlet.doGet(mockReq, mockResp); + } catch (ServletException ex) { + fail(ex.getMessage()); + } catch (IOException ex) { + fail(ex.getMessage()); + } + } + }; + + Thread t = new Thread(simulateClient); + t.start(); + + try { + Thread.sleep(1000); + System.out.println(System.currentTimeMillis() + " Woke up from sleep : " + Thread.currentThread().getName()); + } catch (InterruptedException ex) { + fail(ex.getMessage()); + } + + assertTrue(writes.get() <= 2); + assertEquals(0, servlet.getNumberCurrentConnections()); + } +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixInvokableInfo.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixInvokableInfo.java index b1b405de5..6e8481a42 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixInvokableInfo.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixInvokableInfo.java @@ -19,52 +19,51 @@ public interface HystrixInvokableInfo { - public HystrixCommandGroupKey getCommandGroup(); + HystrixCommandGroupKey getCommandGroup(); - public HystrixCommandKey getCommandKey(); + HystrixCommandKey getCommandKey(); - public HystrixThreadPoolKey getThreadPoolKey(); + HystrixThreadPoolKey getThreadPoolKey(); - public HystrixCommandMetrics getMetrics(); + HystrixCommandMetrics getMetrics(); - public HystrixCommandProperties getProperties(); + HystrixCommandProperties getProperties(); - public boolean isCircuitBreakerOpen(); + boolean isCircuitBreakerOpen(); - public boolean isExecutionComplete(); + boolean isExecutionComplete(); - public boolean isExecutedInThread(); + boolean isExecutedInThread(); - public boolean isSuccessfulExecution(); + boolean isSuccessfulExecution(); - public boolean isFailedExecution(); + boolean isFailedExecution(); - public Throwable getFailedExecutionException(); + Throwable getFailedExecutionException(); - public boolean isResponseFromFallback(); + boolean isResponseFromFallback(); - public boolean isResponseTimedOut(); + boolean isResponseTimedOut(); - public boolean isResponseShortCircuited(); + boolean isResponseShortCircuited(); - public boolean isResponseFromCache(); + boolean isResponseFromCache(); - public boolean isResponseRejected(); + boolean isResponseRejected(); boolean isResponseSemaphoreRejected(); boolean isResponseThreadPoolRejected(); - public List getExecutionEvents(); + List getExecutionEvents(); - public int getNumberEmissions(); + int getNumberEmissions(); - public int getNumberFallbackEmissions(); + int getNumberFallbackEmissions(); int getNumberCollapsed(); - public int getExecutionTimeInMilliseconds(); - - public long getCommandRunStartTimeInNanos(); + int getExecutionTimeInMilliseconds(); + long getCommandRunStartTimeInNanos(); } \ No newline at end of file diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixRequestLog.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixRequestLog.java index 1779b7e1b..e94586556 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixRequestLog.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixRequestLog.java @@ -15,6 +15,15 @@ */ package com.netflix.hystrix; +import com.netflix.hystrix.metric.HystrixRequestEventsStream; +import com.netflix.hystrix.strategy.HystrixPlugins; +import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableHolder; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -24,15 +33,6 @@ import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.netflix.hystrix.strategy.HystrixPlugins; -import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; -import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; -import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableHolder; -import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle; - /** * Log of {@link HystrixCommand} executions and events during the current request. */ @@ -57,9 +57,10 @@ public HystrixRequestLog initialValue() { } public void shutdown(HystrixRequestLog value) { - // nothing to shutdown + //write this value to the Request stream + HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread(); + HystrixRequestEventsStream.getInstance().write(requestContext, value.getAllExecutedCommands()); } - }); /** diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCollapserConfiguration.java b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCollapserConfiguration.java new file mode 100644 index 000000000..d1eac15b2 --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCollapserConfiguration.java @@ -0,0 +1,111 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.config; + +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCollapserProperties; + +public class HystrixCollapserConfiguration { + private final HystrixCollapserKey collapserKey; + private final int maxRequestsInBatch; + private final int timerDelayInMilliseconds; + private final boolean requestCacheEnabled; + private final CollapserMetricsConfig collapserMetricsConfig; + + public HystrixCollapserConfiguration(HystrixCollapserKey collapserKey, int maxRequestsInBatch, int timerDelayInMilliseconds, + boolean requestCacheEnabled, CollapserMetricsConfig collapserMetricsConfig) { + this.collapserKey = collapserKey; + this.maxRequestsInBatch = maxRequestsInBatch; + this.timerDelayInMilliseconds = timerDelayInMilliseconds; + this.requestCacheEnabled = requestCacheEnabled; + this.collapserMetricsConfig = collapserMetricsConfig; + } + + public static HystrixCollapserConfiguration sample(HystrixCollapserKey collapserKey, HystrixCollapserProperties collapserProperties) { + CollapserMetricsConfig collapserMetricsConfig = new CollapserMetricsConfig( + collapserProperties.metricsRollingPercentileWindowBuckets().get(), + collapserProperties.metricsRollingPercentileWindowInMilliseconds().get(), + collapserProperties.metricsRollingPercentileEnabled().get(), + collapserProperties.metricsRollingStatisticalWindowBuckets().get(), + collapserProperties.metricsRollingStatisticalWindowInMilliseconds().get() + ); + + return new HystrixCollapserConfiguration( + collapserKey, + collapserProperties.maxRequestsInBatch().get(), + collapserProperties.timerDelayInMilliseconds().get(), + collapserProperties.requestCacheEnabled().get(), + collapserMetricsConfig + ); + } + + public HystrixCollapserKey getCollapserKey() { + return collapserKey; + } + + public int getMaxRequestsInBatch() { + return maxRequestsInBatch; + } + + public int getTimerDelayInMilliseconds() { + return timerDelayInMilliseconds; + } + + public boolean isRequestCacheEnabled() { + return requestCacheEnabled; + } + + public CollapserMetricsConfig getCollapserMetricsConfig() { + return collapserMetricsConfig; + } + + public static class CollapserMetricsConfig { + private final int rollingPercentileNumberOfBuckets; + private final int rollingPercentileBucketSizeInMilliseconds; + private final boolean rollingPercentileEnabled; + private final int rollingCounterNumberOfBuckets; + private final int rollingCounterBucketSizeInMilliseconds; + + public CollapserMetricsConfig(int rollingPercentileNumberOfBuckets, int rollingPercentileBucketSizeInMilliseconds, boolean rollingPercentileEnabled, + int rollingCounterNumberOfBuckets, int rollingCounterBucketSizeInMilliseconds) { + this.rollingPercentileNumberOfBuckets = rollingCounterNumberOfBuckets; + this.rollingPercentileBucketSizeInMilliseconds = rollingPercentileBucketSizeInMilliseconds; + this.rollingPercentileEnabled = rollingPercentileEnabled; + this.rollingCounterNumberOfBuckets = rollingCounterNumberOfBuckets; + this.rollingCounterBucketSizeInMilliseconds = rollingCounterBucketSizeInMilliseconds; + } + + public int getRollingPercentileNumberOfBuckets() { + return rollingPercentileNumberOfBuckets; + } + + public int getRollingPercentileBucketSizeInMilliseconds() { + return rollingPercentileBucketSizeInMilliseconds; + } + + public boolean isRollingPercentileEnabled() { + return rollingPercentileEnabled; + } + + public int getRollingCounterNumberOfBuckets() { + return rollingCounterNumberOfBuckets; + } + + public int getRollingCounterBucketSizeInMilliseconds() { + return rollingCounterBucketSizeInMilliseconds; + } + } +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCommandConfiguration.java b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCommandConfiguration.java new file mode 100644 index 000000000..1c6a27dac --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixCommandConfiguration.java @@ -0,0 +1,258 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.config; + +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixCommandProperties; +import com.netflix.hystrix.HystrixThreadPoolKey; + +public class HystrixCommandConfiguration { + //The idea is for this object to be serialized off-box. For future-proofing, I'm adding a version so that changing config over time can be handled gracefully + private static final String VERSION = "1"; + private final HystrixCommandKey commandKey; + private final HystrixThreadPoolKey threadPoolKey; + private final HystrixCommandGroupKey groupKey; + private final HystrixCommandExecutionConfig executionConfig; + private final HystrixCommandCircuitBreakerConfig circuitBreakerConfig; + private final HystrixCommandMetricsConfig metricsConfig; + + private HystrixCommandConfiguration(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, + HystrixCommandExecutionConfig executionConfig, + HystrixCommandCircuitBreakerConfig circuitBreakerConfig, + HystrixCommandMetricsConfig metricsConfig) { + this.commandKey = commandKey; + this.threadPoolKey = threadPoolKey; + this.groupKey = groupKey; + this.executionConfig = executionConfig; + this.circuitBreakerConfig = circuitBreakerConfig; + this.metricsConfig = metricsConfig; + } + + public static HystrixCommandConfiguration sample(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, + HystrixCommandGroupKey groupKey, HystrixCommandProperties commandProperties) { + HystrixCommandExecutionConfig executionConfig = new HystrixCommandExecutionConfig( + commandProperties.executionIsolationSemaphoreMaxConcurrentRequests().get(), + commandProperties.executionIsolationStrategy().get(), + commandProperties.executionIsolationThreadInterruptOnTimeout().get(), + commandProperties.executionIsolationThreadPoolKeyOverride().get(), + commandProperties.executionTimeoutEnabled().get(), + commandProperties.executionTimeoutInMilliseconds().get(), + commandProperties.fallbackEnabled().get(), + commandProperties.fallbackIsolationSemaphoreMaxConcurrentRequests().get(), + commandProperties.requestCacheEnabled().get(), + commandProperties.requestLogEnabled().get() + ); + + HystrixCommandCircuitBreakerConfig circuitBreakerConfig = new HystrixCommandCircuitBreakerConfig( + commandProperties.circuitBreakerEnabled().get(), + commandProperties.circuitBreakerErrorThresholdPercentage().get(), + commandProperties.circuitBreakerForceClosed().get(), + commandProperties.circuitBreakerForceOpen().get(), + commandProperties.circuitBreakerRequestVolumeThreshold().get(), + commandProperties.circuitBreakerSleepWindowInMilliseconds().get() + ); + + HystrixCommandMetricsConfig metricsConfig = new HystrixCommandMetricsConfig( + commandProperties.metricsHealthSnapshotIntervalInMilliseconds().get(), + commandProperties.metricsRollingPercentileEnabled().get(), + commandProperties.metricsRollingPercentileWindowBuckets().get(), + commandProperties.metricsRollingPercentileWindowInMilliseconds().get(), + commandProperties.metricsRollingStatisticalWindowBuckets().get(), + commandProperties.metricsRollingStatisticalWindowInMilliseconds().get() + ); + + return new HystrixCommandConfiguration( + commandKey, threadPoolKey, groupKey, executionConfig, circuitBreakerConfig, metricsConfig); + } + + public HystrixThreadPoolKey getThreadPoolKey() { + return threadPoolKey; + } + + public HystrixCommandGroupKey getGroupKey() { + return groupKey; + } + + public HystrixCommandExecutionConfig getExecutionConfig() { + return executionConfig; + } + + public HystrixCommandCircuitBreakerConfig getCircuitBreakerConfig() { + return circuitBreakerConfig; + } + + public HystrixCommandMetricsConfig getMetricsConfig() { + return metricsConfig; + } + + public static class HystrixCommandCircuitBreakerConfig { + private final boolean enabled; + private final int errorThresholdPercentage; + private final boolean forceClosed; + private final boolean forceOpen; + private final int requestVolumeThreshold; + private final int sleepWindowInMilliseconds; + + public HystrixCommandCircuitBreakerConfig(boolean enabled, int errorThresholdPercentage, boolean forceClosed, + boolean forceOpen, int requestVolumeThreshold, int sleepWindowInMilliseconds) { + this.enabled = enabled; + this.errorThresholdPercentage = errorThresholdPercentage; + this.forceClosed = forceClosed; + this.forceOpen = forceOpen; + this.requestVolumeThreshold = requestVolumeThreshold; + this.sleepWindowInMilliseconds = sleepWindowInMilliseconds; + } + + public boolean isEnabled() { + return enabled; + } + + public int getErrorThresholdPercentage() { + return errorThresholdPercentage; + } + + public boolean isForceClosed() { + return forceClosed; + } + + public boolean isForceOpen() { + return forceOpen; + } + + public int getRequestVolumeThreshold() { + return requestVolumeThreshold; + } + + public int getSleepWindowInMilliseconds() { + return sleepWindowInMilliseconds; + } + } + + public static class HystrixCommandExecutionConfig { + private final int semaphoreMaxConcurrentRequests; + private final HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy; + private final boolean threadInterruptOnTimeout; + private final String threadPoolKeyOverride; + private final boolean timeoutEnabled; + private final int timeoutInMilliseconds; + private final boolean fallbackEnabled; + private final int fallbackMaxConcurrentRequest; + private final boolean requestCacheEnabled; + private final boolean requestLogEnabled; + + public HystrixCommandExecutionConfig(int semaphoreMaxConcurrentRequests, HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy, + boolean threadInterruptOnTimeout, String threadPoolKeyOverride, boolean timeoutEnabled, + int timeoutInMilliseconds, boolean fallbackEnabled, int fallbackMaxConcurrentRequests, + boolean requestCacheEnabled, boolean requestLogEnabled) { + this.semaphoreMaxConcurrentRequests = semaphoreMaxConcurrentRequests; + this.isolationStrategy = isolationStrategy; + this.threadInterruptOnTimeout = threadInterruptOnTimeout; + this.threadPoolKeyOverride = threadPoolKeyOverride; + this.timeoutEnabled = timeoutEnabled; + this.timeoutInMilliseconds = timeoutInMilliseconds; + this.fallbackEnabled = fallbackEnabled; + this.fallbackMaxConcurrentRequest = fallbackMaxConcurrentRequests; + this.requestCacheEnabled = requestCacheEnabled; + this.requestLogEnabled = requestLogEnabled; + + } + + public int getSemaphoreMaxConcurrentRequests() { + return semaphoreMaxConcurrentRequests; + } + + public HystrixCommandProperties.ExecutionIsolationStrategy getIsolationStrategy() { + return isolationStrategy; + } + + public boolean isThreadInterruptOnTimeout() { + return threadInterruptOnTimeout; + } + + public String getThreadPoolKeyOverride() { + return threadPoolKeyOverride; + } + + public boolean isTimeoutEnabled() { + return timeoutEnabled; + } + + public int getTimeoutInMilliseconds() { + return timeoutInMilliseconds; + } + + public boolean isFallbackEnabled() { + return fallbackEnabled; + } + + public int getFallbackMaxConcurrentRequest() { + return fallbackMaxConcurrentRequest; + } + + public boolean isRequestCacheEnabled() { + return requestCacheEnabled; + } + + public boolean isRequestLogEnabled() { + return requestLogEnabled; + } + } + + public static class HystrixCommandMetricsConfig { + private final int healthIntervalInMilliseconds; + private final boolean rollingPercentileEnabled; + private final int rollingPercentileNumberOfBuckets; + private final int rollingPercentileBucketSizeInMilliseconds; + private final int rollingCounterNumberOfBuckets; + private final int rollingCounterBucketSizeInMilliseconds; + + public HystrixCommandMetricsConfig(int healthIntervalInMilliseconds, boolean rollingPercentileEnabled, int rollingPercentileNumberOfBuckets, + int rollingPercentileBucketSizeInMilliseconds, int rollingCounterNumberOfBuckets, + int rollingCounterBucketSizeInMilliseconds) { + this.healthIntervalInMilliseconds = healthIntervalInMilliseconds; + this.rollingPercentileEnabled = rollingPercentileEnabled; + this.rollingPercentileNumberOfBuckets = rollingPercentileNumberOfBuckets; + this.rollingPercentileBucketSizeInMilliseconds = rollingPercentileBucketSizeInMilliseconds; + this.rollingCounterNumberOfBuckets = rollingCounterNumberOfBuckets; + this.rollingCounterBucketSizeInMilliseconds = rollingCounterBucketSizeInMilliseconds; + } + + public int getHealthIntervalInMilliseconds() { + return healthIntervalInMilliseconds; + } + + public boolean isRollingPercentileEnabled() { + return rollingPercentileEnabled; + } + + public int getRollingPercentileNumberOfBuckets() { + return rollingPercentileNumberOfBuckets; + } + + public int getRollingPercentileBucketSizeInMilliseconds() { + return rollingPercentileBucketSizeInMilliseconds; + } + + public int getRollingCounterNumberOfBuckets() { + return rollingCounterNumberOfBuckets; + } + + public int getRollingCounterBucketSizeInMilliseconds() { + return rollingCounterBucketSizeInMilliseconds; + } + } +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfiguration.java b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfiguration.java new file mode 100644 index 000000000..b1933bd41 --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfiguration.java @@ -0,0 +1,54 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.config; + +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixThreadPoolKey; + +import java.util.Map; + +public class HystrixConfiguration { + private final Map commandConfig; + private final Map threadPoolConfig; + private final Map collapserConfig; + + private HystrixConfiguration(Map commandConfig, + Map threadPoolConfig, + Map collapserConfig) { + this.commandConfig = commandConfig; + this.threadPoolConfig = threadPoolConfig; + this.collapserConfig = collapserConfig; + } + + public static HystrixConfiguration from(Map commandConfig, + Map threadPoolConfig, + Map collapserConfig) { + return new HystrixConfiguration(commandConfig, threadPoolConfig, collapserConfig); + } + + public Map getCommandConfig() { + return commandConfig; + } + + public Map getThreadPoolConfig() { + return threadPoolConfig; + } + + public Map getCollapserConfig() { + return collapserConfig; + } +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfigurationStream.java b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfigurationStream.java new file mode 100644 index 000000000..290497690 --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixConfigurationStream.java @@ -0,0 +1,139 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.config; + +import com.netflix.hystrix.HystrixCollapserKey; +import com.netflix.hystrix.HystrixCollapserMetrics; +import com.netflix.hystrix.HystrixCollapserProperties; +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixCommandMetrics; +import com.netflix.hystrix.HystrixCommandProperties; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.HystrixThreadPoolMetrics; +import com.netflix.hystrix.HystrixThreadPoolProperties; +import rx.Observable; +import rx.functions.Func0; +import rx.functions.Func1; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * This class samples current Hystrix configuration and exposes that as a stream + */ +public class HystrixConfigurationStream { + + private final int intervalInMilliseconds; + private final Observable timer; + + public HystrixConfigurationStream(final int intervalInMilliseconds) { + this.intervalInMilliseconds = intervalInMilliseconds; + this.timer = Observable.defer(new Func0>() { + @Override + public Observable call() { + return Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS); + } + }); + } + + public Observable observe() { + return timer.map(getAllConfig); + } + + public Observable> observeCommandConfiguration() { + return timer.map(getAllCommandConfig); + } + + public Observable> observeThreadPoolConfiguration() { + return timer.map(getAllThreadPoolConfig); + } + + public Observable> observeCollapserConfiguration() { + return timer.map(getAllCollapserConfig); + } + + public int getIntervalInMilliseconds() { + return this.intervalInMilliseconds; + } + + private static HystrixCommandConfiguration sampleCommandConfiguration(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, + HystrixCommandGroupKey groupKey, HystrixCommandProperties commandProperties) { + return HystrixCommandConfiguration.sample(commandKey, threadPoolKey, groupKey, commandProperties); + } + + private static HystrixThreadPoolConfiguration sampleThreadPoolConfiguration(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { + return HystrixThreadPoolConfiguration.sample(threadPoolKey, threadPoolProperties); + } + + private static HystrixCollapserConfiguration sampleCollapserConfiguration(HystrixCollapserKey collapserKey, HystrixCollapserProperties collapserProperties) { + return HystrixCollapserConfiguration.sample(collapserKey, collapserProperties); + } + + private static final Func1> getAllCommandConfig = + new Func1>() { + @Override + public Map call(Long timestamp) { + Map commandConfigPerKey = new HashMap(); + for (HystrixCommandMetrics commandMetrics: HystrixCommandMetrics.getInstances()) { + HystrixCommandKey commandKey = commandMetrics.getCommandKey(); + HystrixThreadPoolKey threadPoolKey = commandMetrics.getThreadPoolKey(); + HystrixCommandGroupKey groupKey = commandMetrics.getCommandGroup(); + commandConfigPerKey.put(commandKey, sampleCommandConfiguration(commandKey, threadPoolKey, groupKey, commandMetrics.getProperties())); + } + return commandConfigPerKey; + } + }; + + private static final Func1> getAllThreadPoolConfig = + new Func1>() { + @Override + public Map call(Long timestamp) { + Map threadPoolConfigPerKey = new HashMap(); + for (HystrixThreadPoolMetrics threadPoolMetrics: HystrixThreadPoolMetrics.getInstances()) { + HystrixThreadPoolKey threadPoolKey = threadPoolMetrics.getThreadPoolKey(); + threadPoolConfigPerKey.put(threadPoolKey, sampleThreadPoolConfiguration(threadPoolKey, threadPoolMetrics.getProperties())); + } + return threadPoolConfigPerKey; + } + }; + + private static final Func1> getAllCollapserConfig = + new Func1>() { + @Override + public Map call(Long timestamp) { + Map collapserConfigPerKey = new HashMap(); + for (HystrixCollapserMetrics collapserMetrics: HystrixCollapserMetrics.getInstances()) { + HystrixCollapserKey collapserKey = collapserMetrics.getCollapserKey(); + collapserConfigPerKey.put(collapserKey, sampleCollapserConfiguration(collapserKey, collapserMetrics.getProperties())); + } + return collapserConfigPerKey; + } + }; + + private static final Func1 getAllConfig = + new Func1() { + @Override + public HystrixConfiguration call(Long timestamp) { + return HystrixConfiguration.from( + getAllCommandConfig.call(timestamp), + getAllThreadPoolConfig.call(timestamp), + getAllCollapserConfig.call(timestamp) + ); + } + }; +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java new file mode 100644 index 000000000..9d2f50f5a --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/config/HystrixThreadPoolConfiguration.java @@ -0,0 +1,82 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.config; + +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.HystrixThreadPoolProperties; + +public class HystrixThreadPoolConfiguration { + //The idea is for this object to be serialized off-box. For future-proofing, I'm adding a version so that changing config over time can be handled gracefully + private static final String VERSION = "1"; + private final HystrixThreadPoolKey threadPoolKey; + private final int coreSize; + private final int maxQueueSize; + private final int queueRejectionThreshold; + private final int keepAliveTimeInMinutes; + private final int rollingCounterNumberOfBuckets; + private final int rollingCounterBucketSizeInMilliseconds; + + private HystrixThreadPoolConfiguration(HystrixThreadPoolKey threadPoolKey, int coreSize, int maxQueueSize, int queueRejectionThreshold, + int keepAliveTimeInMinutes, int rollingCounterNumberOfBuckets, + int rollingCounterBucketSizeInMilliseconds) { + this.threadPoolKey = threadPoolKey; + this.coreSize = coreSize; + this.maxQueueSize = maxQueueSize; + this.queueRejectionThreshold = queueRejectionThreshold; + this.keepAliveTimeInMinutes = keepAliveTimeInMinutes; + this.rollingCounterNumberOfBuckets = rollingCounterNumberOfBuckets; + this.rollingCounterBucketSizeInMilliseconds = rollingCounterBucketSizeInMilliseconds; + } + + public static HystrixThreadPoolConfiguration sample(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { + return new HystrixThreadPoolConfiguration( + threadPoolKey, + threadPoolProperties.coreSize().get(), + threadPoolProperties.maxQueueSize().get(), + threadPoolProperties.queueSizeRejectionThreshold().get(), + threadPoolProperties.keepAliveTimeMinutes().get(), + threadPoolProperties.metricsRollingStatisticalWindowBuckets().get(), + threadPoolProperties.metricsRollingStatisticalWindowInMilliseconds().get()); + } + + public HystrixThreadPoolKey getThreadPoolKey() { + return threadPoolKey; + } + + public int getCoreSize() { + return coreSize; + } + + public int getMaxQueueSize() { + return maxQueueSize; + } + + public int getQueueRejectionThreshold() { + return queueRejectionThreshold; + } + + public int getKeepAliveTimeInMinutes() { + return keepAliveTimeInMinutes; + } + + public int getRollingCounterNumberOfBuckets() { + return rollingCounterNumberOfBuckets; + } + + public int getRollingCounterBucketSizeInMilliseconds() { + return rollingCounterBucketSizeInMilliseconds; + } +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/HystrixRequestEvents.java b/hystrix-core/src/main/java/com/netflix/hystrix/metric/HystrixRequestEvents.java new file mode 100644 index 000000000..386c24b5c --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/metric/HystrixRequestEvents.java @@ -0,0 +1,40 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.metric; + +import com.netflix.hystrix.HystrixInvokableInfo; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; + +import java.util.Collection; + +public class HystrixRequestEvents { + + private final HystrixRequestContext requestContext; + private final Collection> executions; + + public HystrixRequestEvents(HystrixRequestContext requestContext, Collection> executions) { + this.requestContext = requestContext; + this.executions = executions; + } + + public HystrixRequestContext getRequestContext() { + return requestContext; + } + + public Collection> getExecutions() { + return executions; + } +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/HystrixRequestEventsStream.java b/hystrix-core/src/main/java/com/netflix/hystrix/metric/HystrixRequestEventsStream.java new file mode 100644 index 000000000..6613b806b --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/metric/HystrixRequestEventsStream.java @@ -0,0 +1,55 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.metric; + +import com.netflix.hystrix.HystrixInvokableInfo; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; +import rx.Observable; +import rx.subjects.PublishSubject; +import rx.subjects.Subject; + +import java.util.Collection; + +/** + */ +public class HystrixRequestEventsStream { + private final Subject writeOnlyRequestEventsSubject; + private final Observable readOnlyRequestEvents; + + /* package */ HystrixRequestEventsStream() { + writeOnlyRequestEventsSubject = PublishSubject.create(); + readOnlyRequestEvents = writeOnlyRequestEventsSubject.share(); + } + + private static final HystrixRequestEventsStream INSTANCE = new HystrixRequestEventsStream(); + + public static HystrixRequestEventsStream getInstance() { + return INSTANCE; + } + + public void shutdown() { + writeOnlyRequestEventsSubject.onCompleted(); + } + + public void write(HystrixRequestContext requestContext, Collection> executions) { + HystrixRequestEvents requestEvents = new HystrixRequestEvents(requestContext, executions); + writeOnlyRequestEventsSubject.onNext(requestEvents); + } + + public Observable observe() { + return readOnlyRequestEvents; + } +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixCommandUtilization.java b/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixCommandUtilization.java new file mode 100644 index 000000000..1a35125ba --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixCommandUtilization.java @@ -0,0 +1,34 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.metric.sample; + +import com.netflix.hystrix.HystrixCommandMetrics; + +public class HystrixCommandUtilization { + private final int concurrentCommandCount; + + public HystrixCommandUtilization(int concurrentCommandCount) { + this.concurrentCommandCount = concurrentCommandCount; + } + + public static HystrixCommandUtilization sample(HystrixCommandMetrics commandMetrics) { + return new HystrixCommandUtilization(commandMetrics.getCurrentConcurrentExecutionCount()); + } + + public int getConcurrentCommandCount() { + return concurrentCommandCount; + } +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixThreadPoolUtilization.java b/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixThreadPoolUtilization.java new file mode 100644 index 000000000..ceba4f566 --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixThreadPoolUtilization.java @@ -0,0 +1,57 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.metric.sample; + +import com.netflix.hystrix.HystrixThreadPoolMetrics; + +public class HystrixThreadPoolUtilization { + private final int currentActiveCount; + private final int currentCorePoolSize; + private final int currentPoolSize; + private final int currentQueueSize; + + public HystrixThreadPoolUtilization(int currentActiveCount, int currentCorePoolSize, int currentPoolSize, int currentQueueSize) { + this.currentActiveCount = currentActiveCount; + this.currentCorePoolSize = currentCorePoolSize; + this.currentPoolSize = currentPoolSize; + this.currentQueueSize = currentQueueSize; + } + + public static HystrixThreadPoolUtilization sample(HystrixThreadPoolMetrics threadPoolMetrics) { + return new HystrixThreadPoolUtilization( + threadPoolMetrics.getCurrentActiveCount().intValue(), + threadPoolMetrics.getCurrentCorePoolSize().intValue(), + threadPoolMetrics.getCurrentPoolSize().intValue(), + threadPoolMetrics.getCurrentQueueSize().intValue() + ); + } + + public int getCurrentActiveCount() { + return currentActiveCount; + } + + public int getCurrentCorePoolSize() { + return currentCorePoolSize; + } + + public int getCurrentPoolSize() { + return currentPoolSize; + } + + public int getCurrentQueueSize() { + return currentQueueSize; + } +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilization.java b/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilization.java new file mode 100644 index 000000000..8cda44939 --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilization.java @@ -0,0 +1,44 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.metric.sample; + +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixThreadPoolKey; + +import java.util.Map; + +public class HystrixUtilization { + private final Map commandUtilizationMap; + private final Map threadPoolUtilizationMap; + + public HystrixUtilization(Map commandUtilizationMap, Map threadPoolUtilizationMap) { + this.commandUtilizationMap = commandUtilizationMap; + this.threadPoolUtilizationMap = threadPoolUtilizationMap; + } + + public static HystrixUtilization from(Map commandUtilizationMap, + Map threadPoolUtilizationMap) { + return new HystrixUtilization(commandUtilizationMap, threadPoolUtilizationMap); + } + + public Map getCommandUtilizationMap() { + return commandUtilizationMap; + } + + public Map getThreadPoolUtilizationMap() { + return threadPoolUtilizationMap; + } +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java b/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java new file mode 100644 index 000000000..b6fea2487 --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStream.java @@ -0,0 +1,108 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.metric.sample; + +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixCommandMetrics; +import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.HystrixThreadPoolMetrics; +import rx.Observable; +import rx.functions.Func0; +import rx.functions.Func1; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * This class samples current Hystrix utilization of resources and exposes that as a stream + */ +public class HystrixUtilizationStream { + + private final int intervalInMilliseconds; + private final Observable timer; + + public HystrixUtilizationStream(final int intervalInMilliseconds) { + this.intervalInMilliseconds = intervalInMilliseconds; + this.timer = Observable.defer(new Func0>() { + @Override + public Observable call() { + return Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS); + } + }); + } + + public Observable observe() { + return timer.map(getAllUtilization); + } + + public Observable> observeCommandUtilization() { + return timer.map(getAllCommandUtilization); + } + + public Observable> observeThreadPoolUtilization() { + return timer.map(getAllThreadPoolUtilization); + } + + public int getIntervalInMilliseconds() { + return this.intervalInMilliseconds; + } + + private static HystrixCommandUtilization sampleCommandUtilization(HystrixCommandMetrics commandMetrics) { + return HystrixCommandUtilization.sample(commandMetrics); + } + + private static HystrixThreadPoolUtilization sampleThreadPoolUtilization(HystrixThreadPoolMetrics threadPoolMetrics) { + return HystrixThreadPoolUtilization.sample(threadPoolMetrics); + } + + private static final Func1> getAllCommandUtilization = + new Func1>() { + @Override + public Map call(Long timestamp) { + Map commandUtilizationPerKey = new HashMap(); + for (HystrixCommandMetrics commandMetrics: HystrixCommandMetrics.getInstances()) { + HystrixCommandKey commandKey = commandMetrics.getCommandKey(); + commandUtilizationPerKey.put(commandKey, sampleCommandUtilization(commandMetrics)); + } + return commandUtilizationPerKey; + } + }; + + private static final Func1> getAllThreadPoolUtilization = + new Func1>() { + @Override + public Map call(Long timestamp) { + Map threadPoolUtilizationPerKey = new HashMap(); + for (HystrixThreadPoolMetrics threadPoolMetrics: HystrixThreadPoolMetrics.getInstances()) { + HystrixThreadPoolKey threadPoolKey = threadPoolMetrics.getThreadPoolKey(); + threadPoolUtilizationPerKey.put(threadPoolKey, sampleThreadPoolUtilization(threadPoolMetrics)); + } + return threadPoolUtilizationPerKey; + } + }; + + private static final Func1 getAllUtilization = + new Func1() { + @Override + public HystrixUtilization call(Long timestamp) { + return HystrixUtilization.from( + getAllCommandUtilization.call(timestamp), + getAllThreadPoolUtilization.call(timestamp) + ); + } + }; +} diff --git a/hystrix-examples-webapp/src/main/webapp/WEB-INF/web.xml b/hystrix-examples-webapp/src/main/webapp/WEB-INF/web.xml index 42a1c74e0..a189a6642 100644 --- a/hystrix-examples-webapp/src/main/webapp/WEB-INF/web.xml +++ b/hystrix-examples-webapp/src/main/webapp/WEB-INF/web.xml @@ -36,4 +36,40 @@ /hystrix.stream + + + HystrixConfigSseServlet + HystrixConfigSseServlet + com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet + + + + HystrixConfigSseServlet + /hystrix/config.stream + + + + + HystrixUtilizationSseServlet + HystrixUtilizationSseServlet + com.netflix.hystrix.contrib.sample.stream.HystrixUtilizationSseServlet + + + + HystrixUtilizationSseServlet + /hystrix/utilization.stream + + + + + HystrixRequestEventsSseServlet + HystrixRequestEventsSseServlet + com.netflix.hystrix.contrib.requests.stream.HystrixRequestEventsSseServlet + + + + HystrixRequestEventsSseServlet + /hystrix/requests.stream + +