Skip to content

Commit

Permalink
Added buffered SSE request stream that emits a collection of requests…
Browse files Browse the repository at this point in the history
… on a timer
  • Loading branch information
Matt Jacobs committed Jan 23, 2016
1 parent 6b857d0 commit fc10998
Show file tree
Hide file tree
Showing 6 changed files with 414 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix.contrib.requests.stream;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixInvokableInfo;
import com.netflix.hystrix.metric.HystrixRequestEvents;
import com.netflix.hystrix.metric.HystrixRequestEventsStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Subscriber;
import rx.Subscription;
import rx.schedulers.Schedulers;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
*/
public class HystrixRequestEventsSseServlet extends HttpServlet {

private static final Logger logger = LoggerFactory.getLogger(HystrixRequestEventsSseServlet.class);

private static volatile boolean isDestroyed = false;

private static final String DELAY_REQ_PARAM_NAME = "delay";
private static final int DEFAULT_DELAY_IN_MILLISECONDS = 10000;
private static final int DEFAULT_QUEUE_DEPTH = 1000;
private static final String PING = "\n: ping\n";

/* used to track number of connections and throttle */
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections =
DynamicPropertyFactory.getInstance().getIntProperty("hystrix.requests.stream.maxConcurrentConnections", 5);

private final LinkedBlockingQueue<HystrixRequestEvents> requestQueue = new LinkedBlockingQueue<HystrixRequestEvents>(DEFAULT_QUEUE_DEPTH);
private final JsonFactory jsonFactory = new JsonFactory();

/**
* Handle incoming GETs
*/
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
if (isDestroyed) {
response.sendError(503, "Service has been shut down.");
} else {
handleRequest(request, response);
}
}

/* package-private */
int getDelayFromHttpRequest(HttpServletRequest req) {
try {
String delay = req.getParameter(DELAY_REQ_PARAM_NAME);
if (delay != null) {
return Math.max(Integer.parseInt(delay), 1);
}
} catch (Throwable ex) {
//silently fail
}
return DEFAULT_DELAY_IN_MILLISECONDS;
}

/**
* WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing
* a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at
* shutdown, perhaps from some other servlet's destroy() method.
*/
public static void shutdown() {
isDestroyed = true;
}

@Override
public void init() throws ServletException {
isDestroyed = false;
}

/**
* Handle servlet being undeployed by gracefully releasing connections so poller threads stop.
*/
@Override
public void destroy() {
/* set marker so the loops can break out */
isDestroyed = true;
super.destroy();
}

private String convertToString(Collection<HystrixRequestEvents> requests) throws IOException {
StringWriter jsonString = new StringWriter();
JsonGenerator json = jsonFactory.createGenerator(jsonString);

json.writeStartArray();
for (HystrixRequestEvents request : requests) {
convertRequestToJson(json, request);
}
json.writeEndArray();
json.close();
return jsonString.getBuffer().toString();
}

private void convertRequestToJson(JsonGenerator json, HystrixRequestEvents request) throws IOException {
json.writeStartObject();
json.writeStringField("request", request.getRequestContext().toString());
json.writeObjectFieldStart("commands");
for (HystrixInvokableInfo<?> execution: request.getExecutions()) {
convertExecutionToJson(json, execution);
}
json.writeEndObject();
json.writeEndObject();
}

private void convertExecutionToJson(JsonGenerator json, HystrixInvokableInfo<?> execution) throws IOException {
json.writeObjectFieldStart(execution.getCommandKey().name());
json.writeNumberField("latency", execution.getExecutionTimeInMilliseconds());
json.writeArrayFieldStart("events");
for (HystrixEventType eventType: execution.getExecutionEvents()) {
switch (eventType) {
case EMIT:
json.writeStartObject();
json.writeNumberField(eventType.name(), execution.getNumberEmissions());
json.writeEndObject();
break;
case FALLBACK_EMIT:
json.writeStartObject();
json.writeNumberField(eventType.name(), execution.getNumberFallbackEmissions());
json.writeEndObject();
break;
case COLLAPSED:
json.writeStartObject();
json.writeNumberField(eventType.name(), execution.getNumberCollapsed());
json.writeEndObject();
break;
default:
json.writeString(eventType.name());
break;
}
}
json.writeEndArray();
json.writeEndObject();
}

/**
* - maintain an open connection with the client
* - on initial connection send latest data of each requested event type
* - subsequently send all changes for each requested event type
*
* @param request incoming HTTP Request
* @param response outgoing HTTP Response (as a streaming response)
* @throws javax.servlet.ServletException
* @throws java.io.IOException
*/
private void handleRequest(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
final AtomicBoolean moreDataWillBeSent = new AtomicBoolean(true);
Subscription requestsSubscription = null;

/* ensure we aren't allowing more connections than we want */
int numberConnections = concurrentConnections.incrementAndGet();
try {
int maxNumberConnectionsAllowed = maxConcurrentConnections.get();
if (numberConnections > maxNumberConnectionsAllowed) {
response.sendError(503, "MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed);
} else {
int delay = getDelayFromHttpRequest(request);

/* initialize response */
response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
response.setHeader("Pragma", "no-cache");

final PrintWriter writer = response.getWriter();

//since the sample stream is based on Observable.interval, events will get published on an RxComputation thread
//since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext
requestsSubscription = HystrixRequestEventsStream.getInstance()
.observe()
.observeOn(Schedulers.io())
.subscribe(new Subscriber<HystrixRequestEvents>() {
@Override
public void onCompleted() {
logger.error("HystrixRequestEventsSseServlet received unexpected OnCompleted from request stream");
moreDataWillBeSent.set(false);
}

@Override
public void onError(Throwable e) {
moreDataWillBeSent.set(false);
}

@Override
public void onNext(HystrixRequestEvents requestEvents) {
if (requestEvents != null) {
requestQueue.offer(requestEvents);
}
}
});

while (moreDataWillBeSent.get() && !isDestroyed) {
try {
if (requestQueue.isEmpty()) {
try {
writer.print(PING);
writer.flush();
} catch (Throwable t) {
throw new IOException("Exception while writing ping");
}

if (writer.checkError()) {
throw new IOException("io error");
}
} else {
List<HystrixRequestEvents> l = new ArrayList<HystrixRequestEvents>();
requestQueue.drainTo(l);
String requestEventsAsStr = convertToString(l);
//try {
//} catch (IOException ioe) {
// //exception while converting String to JSON
// logger.error("Error converting configuration to JSON ", ioe);
//}
if (requestEventsAsStr != null) {
try {
writer.print("data: " + requestEventsAsStr + "\n\n");
// explicitly check for client disconnect - PrintWriter does not throw exceptions
if (writer.checkError()) {
throw new IOException("io error");
}
writer.flush();
} catch (IOException ioe) {
moreDataWillBeSent.set(false);
}
}
}
Thread.sleep(delay);
} catch (InterruptedException e) {
moreDataWillBeSent.set(false);
}
}
}
} finally {
concurrentConnections.decrementAndGet();
if (requestsSubscription != null && !requestsSubscription.isUnsubscribed()) {
requestsSubscription.unsubscribe();
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,52 +19,51 @@

public interface HystrixInvokableInfo<R> {

public HystrixCommandGroupKey getCommandGroup();
HystrixCommandGroupKey getCommandGroup();

public HystrixCommandKey getCommandKey();
HystrixCommandKey getCommandKey();

public HystrixThreadPoolKey getThreadPoolKey();
HystrixThreadPoolKey getThreadPoolKey();

public HystrixCommandMetrics getMetrics();
HystrixCommandMetrics getMetrics();

public HystrixCommandProperties getProperties();
HystrixCommandProperties getProperties();

public boolean isCircuitBreakerOpen();
boolean isCircuitBreakerOpen();

public boolean isExecutionComplete();
boolean isExecutionComplete();

public boolean isExecutedInThread();
boolean isExecutedInThread();

public boolean isSuccessfulExecution();
boolean isSuccessfulExecution();

public boolean isFailedExecution();
boolean isFailedExecution();

public Throwable getFailedExecutionException();
Throwable getFailedExecutionException();

public boolean isResponseFromFallback();
boolean isResponseFromFallback();

public boolean isResponseTimedOut();
boolean isResponseTimedOut();

public boolean isResponseShortCircuited();
boolean isResponseShortCircuited();

public boolean isResponseFromCache();
boolean isResponseFromCache();

public boolean isResponseRejected();
boolean isResponseRejected();

boolean isResponseSemaphoreRejected();

boolean isResponseThreadPoolRejected();

public List<HystrixEventType> getExecutionEvents();
List<HystrixEventType> getExecutionEvents();

public int getNumberEmissions();
int getNumberEmissions();

public int getNumberFallbackEmissions();
int getNumberFallbackEmissions();

int getNumberCollapsed();

public int getExecutionTimeInMilliseconds();

public long getCommandRunStartTimeInNanos();
int getExecutionTimeInMilliseconds();

long getCommandRunStartTimeInNanos();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@
*/
package com.netflix.hystrix;

import com.netflix.hystrix.metric.HystrixRequestEventsStream;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableHolder;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -24,15 +33,6 @@
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableHolder;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;

/**
* Log of {@link HystrixCommand} executions and events during the current request.
*/
Expand All @@ -57,9 +57,10 @@ public HystrixRequestLog initialValue() {
}

public void shutdown(HystrixRequestLog value) {
// nothing to shutdown
//write this value to the Request stream
HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
HystrixRequestEventsStream.getInstance().write(requestContext, value.getAllExecutedCommands());
}

});

/**
Expand Down
Loading

0 comments on commit fc10998

Please sign in to comment.