diff --git a/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsSseServlet.java b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsSseServlet.java new file mode 100644 index 000000000..c1b606bc4 --- /dev/null +++ b/hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/requests/stream/HystrixRequestEventsSseServlet.java @@ -0,0 +1,274 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.contrib.requests.stream; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.netflix.config.DynamicIntProperty; +import com.netflix.config.DynamicPropertyFactory; +import com.netflix.hystrix.HystrixEventType; +import com.netflix.hystrix.HystrixInvokableInfo; +import com.netflix.hystrix.metric.HystrixRequestEvents; +import com.netflix.hystrix.metric.HystrixRequestEventsStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Subscriber; +import rx.Subscription; +import rx.schedulers.Schedulers; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ +public class HystrixRequestEventsSseServlet extends HttpServlet { + + private static final Logger logger = LoggerFactory.getLogger(HystrixRequestEventsSseServlet.class); + + private static volatile boolean isDestroyed = false; + + private static final String DELAY_REQ_PARAM_NAME = "delay"; + private static final int DEFAULT_DELAY_IN_MILLISECONDS = 10000; + private static final int DEFAULT_QUEUE_DEPTH = 1000; + private static final String PING = "\n: ping\n"; + + /* used to track number of connections and throttle */ + private static AtomicInteger concurrentConnections = new AtomicInteger(0); + private static DynamicIntProperty maxConcurrentConnections = + DynamicPropertyFactory.getInstance().getIntProperty("hystrix.requests.stream.maxConcurrentConnections", 5); + + private final LinkedBlockingQueue requestQueue = new LinkedBlockingQueue(DEFAULT_QUEUE_DEPTH); + private final JsonFactory jsonFactory = new JsonFactory(); + + /** + * Handle incoming GETs + */ + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + if (isDestroyed) { + response.sendError(503, "Service has been shut down."); + } else { + handleRequest(request, response); + } + } + + /* package-private */ + int getDelayFromHttpRequest(HttpServletRequest req) { + try { + String delay = req.getParameter(DELAY_REQ_PARAM_NAME); + if (delay != null) { + return Math.max(Integer.parseInt(delay), 1); + } + } catch (Throwable ex) { + //silently fail + } + return DEFAULT_DELAY_IN_MILLISECONDS; + } + + /** + * WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing + * a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at + * shutdown, perhaps from some other servlet's destroy() method. + */ + public static void shutdown() { + isDestroyed = true; + } + + @Override + public void init() throws ServletException { + isDestroyed = false; + } + + /** + * Handle servlet being undeployed by gracefully releasing connections so poller threads stop. + */ + @Override + public void destroy() { + /* set marker so the loops can break out */ + isDestroyed = true; + super.destroy(); + } + + private String convertToString(Collection requests) throws IOException { + StringWriter jsonString = new StringWriter(); + JsonGenerator json = jsonFactory.createGenerator(jsonString); + + json.writeStartArray(); + for (HystrixRequestEvents request : requests) { + convertRequestToJson(json, request); + } + json.writeEndArray(); + json.close(); + return jsonString.getBuffer().toString(); + } + + private void convertRequestToJson(JsonGenerator json, HystrixRequestEvents request) throws IOException { + json.writeStartObject(); + json.writeStringField("request", request.getRequestContext().toString()); + json.writeObjectFieldStart("commands"); + for (HystrixInvokableInfo execution: request.getExecutions()) { + convertExecutionToJson(json, execution); + } + json.writeEndObject(); + json.writeEndObject(); + } + + private void convertExecutionToJson(JsonGenerator json, HystrixInvokableInfo execution) throws IOException { + json.writeObjectFieldStart(execution.getCommandKey().name()); + json.writeNumberField("latency", execution.getExecutionTimeInMilliseconds()); + json.writeArrayFieldStart("events"); + for (HystrixEventType eventType: execution.getExecutionEvents()) { + switch (eventType) { + case EMIT: + json.writeStartObject(); + json.writeNumberField(eventType.name(), execution.getNumberEmissions()); + json.writeEndObject(); + break; + case FALLBACK_EMIT: + json.writeStartObject(); + json.writeNumberField(eventType.name(), execution.getNumberFallbackEmissions()); + json.writeEndObject(); + break; + case COLLAPSED: + json.writeStartObject(); + json.writeNumberField(eventType.name(), execution.getNumberCollapsed()); + json.writeEndObject(); + break; + default: + json.writeString(eventType.name()); + break; + } + } + json.writeEndArray(); + json.writeEndObject(); + } + + /** + * - maintain an open connection with the client + * - on initial connection send latest data of each requested event type + * - subsequently send all changes for each requested event type + * + * @param request incoming HTTP Request + * @param response outgoing HTTP Response (as a streaming response) + * @throws javax.servlet.ServletException + * @throws java.io.IOException + */ + private void handleRequest(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { + final AtomicBoolean moreDataWillBeSent = new AtomicBoolean(true); + Subscription requestsSubscription = null; + + /* ensure we aren't allowing more connections than we want */ + int numberConnections = concurrentConnections.incrementAndGet(); + try { + int maxNumberConnectionsAllowed = maxConcurrentConnections.get(); + if (numberConnections > maxNumberConnectionsAllowed) { + response.sendError(503, "MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed); + } else { + int delay = getDelayFromHttpRequest(request); + + /* initialize response */ + response.setHeader("Content-Type", "text/event-stream;charset=UTF-8"); + response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate"); + response.setHeader("Pragma", "no-cache"); + + final PrintWriter writer = response.getWriter(); + + //since the sample stream is based on Observable.interval, events will get published on an RxComputation thread + //since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext + requestsSubscription = HystrixRequestEventsStream.getInstance() + .observe() + .observeOn(Schedulers.io()) + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + logger.error("HystrixRequestEventsSseServlet received unexpected OnCompleted from request stream"); + moreDataWillBeSent.set(false); + } + + @Override + public void onError(Throwable e) { + moreDataWillBeSent.set(false); + } + + @Override + public void onNext(HystrixRequestEvents requestEvents) { + if (requestEvents != null) { + requestQueue.offer(requestEvents); + } + } + }); + + while (moreDataWillBeSent.get() && !isDestroyed) { + try { + if (requestQueue.isEmpty()) { + try { + writer.print(PING); + writer.flush(); + } catch (Throwable t) { + throw new IOException("Exception while writing ping"); + } + + if (writer.checkError()) { + throw new IOException("io error"); + } + } else { + List l = new ArrayList(); + requestQueue.drainTo(l); + String requestEventsAsStr = convertToString(l); + //try { + //} catch (IOException ioe) { + // //exception while converting String to JSON + // logger.error("Error converting configuration to JSON ", ioe); + //} + if (requestEventsAsStr != null) { + try { + writer.print("data: " + requestEventsAsStr + "\n\n"); + // explicitly check for client disconnect - PrintWriter does not throw exceptions + if (writer.checkError()) { + throw new IOException("io error"); + } + writer.flush(); + } catch (IOException ioe) { + moreDataWillBeSent.set(false); + } + } + } + Thread.sleep(delay); + } catch (InterruptedException e) { + moreDataWillBeSent.set(false); + } + } + } + } finally { + concurrentConnections.decrementAndGet(); + if (requestsSubscription != null && !requestsSubscription.isUnsubscribed()) { + requestsSubscription.unsubscribe(); + } + } + } +} + diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixInvokableInfo.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixInvokableInfo.java index b1b405de5..6e8481a42 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixInvokableInfo.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixInvokableInfo.java @@ -19,52 +19,51 @@ public interface HystrixInvokableInfo { - public HystrixCommandGroupKey getCommandGroup(); + HystrixCommandGroupKey getCommandGroup(); - public HystrixCommandKey getCommandKey(); + HystrixCommandKey getCommandKey(); - public HystrixThreadPoolKey getThreadPoolKey(); + HystrixThreadPoolKey getThreadPoolKey(); - public HystrixCommandMetrics getMetrics(); + HystrixCommandMetrics getMetrics(); - public HystrixCommandProperties getProperties(); + HystrixCommandProperties getProperties(); - public boolean isCircuitBreakerOpen(); + boolean isCircuitBreakerOpen(); - public boolean isExecutionComplete(); + boolean isExecutionComplete(); - public boolean isExecutedInThread(); + boolean isExecutedInThread(); - public boolean isSuccessfulExecution(); + boolean isSuccessfulExecution(); - public boolean isFailedExecution(); + boolean isFailedExecution(); - public Throwable getFailedExecutionException(); + Throwable getFailedExecutionException(); - public boolean isResponseFromFallback(); + boolean isResponseFromFallback(); - public boolean isResponseTimedOut(); + boolean isResponseTimedOut(); - public boolean isResponseShortCircuited(); + boolean isResponseShortCircuited(); - public boolean isResponseFromCache(); + boolean isResponseFromCache(); - public boolean isResponseRejected(); + boolean isResponseRejected(); boolean isResponseSemaphoreRejected(); boolean isResponseThreadPoolRejected(); - public List getExecutionEvents(); + List getExecutionEvents(); - public int getNumberEmissions(); + int getNumberEmissions(); - public int getNumberFallbackEmissions(); + int getNumberFallbackEmissions(); int getNumberCollapsed(); - public int getExecutionTimeInMilliseconds(); - - public long getCommandRunStartTimeInNanos(); + int getExecutionTimeInMilliseconds(); + long getCommandRunStartTimeInNanos(); } \ No newline at end of file diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixRequestLog.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixRequestLog.java index 1779b7e1b..e94586556 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixRequestLog.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixRequestLog.java @@ -15,6 +15,15 @@ */ package com.netflix.hystrix; +import com.netflix.hystrix.metric.HystrixRequestEventsStream; +import com.netflix.hystrix.strategy.HystrixPlugins; +import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableHolder; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -24,15 +33,6 @@ import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.netflix.hystrix.strategy.HystrixPlugins; -import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; -import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; -import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableHolder; -import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle; - /** * Log of {@link HystrixCommand} executions and events during the current request. */ @@ -57,9 +57,10 @@ public HystrixRequestLog initialValue() { } public void shutdown(HystrixRequestLog value) { - // nothing to shutdown + //write this value to the Request stream + HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread(); + HystrixRequestEventsStream.getInstance().write(requestContext, value.getAllExecutedCommands()); } - }); /** diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/HystrixRequestEvents.java b/hystrix-core/src/main/java/com/netflix/hystrix/metric/HystrixRequestEvents.java new file mode 100644 index 000000000..386c24b5c --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/metric/HystrixRequestEvents.java @@ -0,0 +1,40 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.metric; + +import com.netflix.hystrix.HystrixInvokableInfo; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; + +import java.util.Collection; + +public class HystrixRequestEvents { + + private final HystrixRequestContext requestContext; + private final Collection> executions; + + public HystrixRequestEvents(HystrixRequestContext requestContext, Collection> executions) { + this.requestContext = requestContext; + this.executions = executions; + } + + public HystrixRequestContext getRequestContext() { + return requestContext; + } + + public Collection> getExecutions() { + return executions; + } +} diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/HystrixRequestEventsStream.java b/hystrix-core/src/main/java/com/netflix/hystrix/metric/HystrixRequestEventsStream.java new file mode 100644 index 000000000..6613b806b --- /dev/null +++ b/hystrix-core/src/main/java/com/netflix/hystrix/metric/HystrixRequestEventsStream.java @@ -0,0 +1,55 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.hystrix.metric; + +import com.netflix.hystrix.HystrixInvokableInfo; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; +import rx.Observable; +import rx.subjects.PublishSubject; +import rx.subjects.Subject; + +import java.util.Collection; + +/** + */ +public class HystrixRequestEventsStream { + private final Subject writeOnlyRequestEventsSubject; + private final Observable readOnlyRequestEvents; + + /* package */ HystrixRequestEventsStream() { + writeOnlyRequestEventsSubject = PublishSubject.create(); + readOnlyRequestEvents = writeOnlyRequestEventsSubject.share(); + } + + private static final HystrixRequestEventsStream INSTANCE = new HystrixRequestEventsStream(); + + public static HystrixRequestEventsStream getInstance() { + return INSTANCE; + } + + public void shutdown() { + writeOnlyRequestEventsSubject.onCompleted(); + } + + public void write(HystrixRequestContext requestContext, Collection> executions) { + HystrixRequestEvents requestEvents = new HystrixRequestEvents(requestContext, executions); + writeOnlyRequestEventsSubject.onNext(requestEvents); + } + + public Observable observe() { + return readOnlyRequestEvents; + } +} diff --git a/hystrix-examples-webapp/src/main/webapp/WEB-INF/web.xml b/hystrix-examples-webapp/src/main/webapp/WEB-INF/web.xml index c691975a7..a189a6642 100644 --- a/hystrix-examples-webapp/src/main/webapp/WEB-INF/web.xml +++ b/hystrix-examples-webapp/src/main/webapp/WEB-INF/web.xml @@ -60,4 +60,16 @@ /hystrix/utilization.stream + + + HystrixRequestEventsSseServlet + HystrixRequestEventsSseServlet + com.netflix.hystrix.contrib.requests.stream.HystrixRequestEventsSseServlet + + + + HystrixRequestEventsSseServlet + /hystrix/requests.stream + +