Skip to content

Commit

Permalink
Merge pull request opensearch-project#547 from gregschohn/FixHangingR…
Browse files Browse the repository at this point in the history
…equests

Assorted bugfixes and improvements
  • Loading branch information
gregschohn authored Apr 3, 2024
2 parents 89e3f65 + 5865ca3 commit d264a8c
Show file tree
Hide file tree
Showing 21 changed files with 335 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand Down Expand Up @@ -112,6 +117,10 @@ HttpHeaders convertHeaders(Map<String, String> headers) {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
try {
if (req.decoderResult().isFailure()) {
ctx.close();
return;
}
var specifiedResponse = responseBuilder.apply(new RequestToFirstLineAdapter(req));
var fullResponse = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
Expand Down Expand Up @@ -156,8 +165,9 @@ protected void initChannel(SocketChannel ch) {
if (timeout != null) {
pipeline.addLast(new ReadTimeoutHandler(timeout.toMillis(), TimeUnit.MILLISECONDS));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpObjectAggregator(16*1024));
pipeline.addLast(new HttpResponseEncoder());
pipeline.addLast(makeHandlerFromResponseContext(responseBuilder));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;

Expand All @@ -10,6 +11,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Slf4j
public class Accumulation {

enum State {
Expand Down Expand Up @@ -73,7 +75,7 @@ public void expire() {
}

public RequestResponsePacketPair getOrCreateTransactionPair(ITrafficStreamKey forTrafficStreamKey,
Instant originTimestamp) {
Instant originTimestamp) {
if (rrPairWithCallback != null) {
return rrPairWithCallback.pair;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.Callable;
Expand All @@ -23,13 +26,14 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

@Slf4j
public class HttpByteBufFormatter {

private static final ThreadLocal<Optional<PacketPrintFormat>> printStyle =
ThreadLocal.withInitial(Optional::empty);

public enum PacketPrintFormat {
TRUNCATED, FULL_BYTES, PARSED_HTTP
TRUNCATED, FULL_BYTES, PARSED_HTTP, PARSED_HTTP_SORTED_HEADERS
}

public static <T> T setPrintStyleForCallable(PacketPrintFormat packetPrintFormat, Callable<T> r) throws Exception {
Expand Down Expand Up @@ -72,21 +76,23 @@ public static String httpPacketBufsToString(HttpMessageType msgType, Stream<Byte
case FULL_BYTES:
return httpPacketBufsToString(byteBufStream, Long.MAX_VALUE, releaseByteBufs);
case PARSED_HTTP:
return httpPacketsToPrettyPrintedString(msgType, byteBufStream, releaseByteBufs);
return httpPacketsToPrettyPrintedString(msgType, byteBufStream, false, releaseByteBufs);
case PARSED_HTTP_SORTED_HEADERS:
return httpPacketsToPrettyPrintedString(msgType, byteBufStream, true, releaseByteBufs);
default:
throw new IllegalStateException("Unknown PacketPrintFormat: " + printStyle.get());
}
}

public static String httpPacketsToPrettyPrintedString(HttpMessageType msgType, Stream<ByteBuf> byteBufStream,
boolean releaseByteBufs) {
boolean sortHeaders, boolean releaseByteBufs) {
HttpMessage httpMessage = parseHttpMessageFromBufs(msgType, byteBufStream, releaseByteBufs);
var holderOp = Optional.ofNullable((httpMessage instanceof ByteBufHolder) ? (ByteBufHolder) httpMessage : null);
try {
if (httpMessage instanceof FullHttpRequest) {
return prettyPrintNettyRequest((FullHttpRequest) httpMessage);
return prettyPrintNettyRequest((FullHttpRequest) httpMessage, sortHeaders);
} else if (httpMessage instanceof FullHttpResponse) {
return prettyPrintNettyResponse((FullHttpResponse) httpMessage);
return prettyPrintNettyResponse((FullHttpResponse) httpMessage, sortHeaders);
} else if (httpMessage == null) {
return "[NULL]";
} else {
Expand All @@ -98,20 +104,24 @@ public static String httpPacketsToPrettyPrintedString(HttpMessageType msgType, S
}
}

public static String prettyPrintNettyRequest(FullHttpRequest msg) {
public static String prettyPrintNettyRequest(FullHttpRequest msg, boolean sortHeaders) {
var sj = new StringJoiner("\n");
sj.add(msg.method() + " " + msg.uri() + " " + msg.protocolVersion().text());
return prettyPrintNettyMessage(sj, msg, msg.content());
return prettyPrintNettyMessage(sj, sortHeaders, msg, msg.content());
}

static String prettyPrintNettyResponse(FullHttpResponse msg) {
static String prettyPrintNettyResponse(FullHttpResponse msg, boolean sortHeaders) {
var sj = new StringJoiner("\n");
sj.add(msg.protocolVersion().text() + " " + msg.status().code() + " " + msg.status().reasonPhrase());
return prettyPrintNettyMessage(sj, msg, msg.content());
return prettyPrintNettyMessage(sj, sortHeaders, msg, msg.content());
}

private static String prettyPrintNettyMessage(StringJoiner sj, HttpMessage msg, ByteBuf content) {
msg.headers().forEach(kvp -> sj.add(String.format("%s: %s", kvp.getKey(), kvp.getValue())));
private static String prettyPrintNettyMessage(StringJoiner sj, boolean sorted, HttpMessage msg, ByteBuf content) {
var h = msg.headers().entries().stream();
if (sorted) {
h = h.sorted(Map.Entry.comparingByKey());
}
h.forEach(kvp -> sj.add(String.format("%s: %s", kvp.getKey(), kvp.getValue())));
sj.add("");
sj.add(content.toString(StandardCharsets.UTF_8));
return sj.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.ReferenceCounted;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.replay.datatypes.TransformedPackets;
Expand Down Expand Up @@ -115,7 +116,6 @@ private static byte[] getBytesFromByteBuf(ByteBuf buf) {
private static Map<String, Object> fillMap(LinkedHashMap<String, Object> map,
HttpHeaders headers, ByteBuf content) {
String base64body = Base64.getEncoder().encodeToString(getBytesFromByteBuf(content));
content.release();
map.put("body", base64body);
headers.entries().stream().forEach(kvp -> map.put(kvp.getKey(), kvp.getValue()));
return map;
Expand All @@ -139,13 +139,17 @@ private static Map<String, Object> convertRequest(@NonNull IReplayContexts.ITupl
return makeSafeMap(context, () -> {
var map = new LinkedHashMap<String, Object>();
var message = HttpByteBufFormatter.parseHttpRequestFromBufs(byteToByteBufStream(data), true);
map.put("Request-URI", message.uri());
map.put("Method", message.method().toString());
map.put("HTTP-Version", message.protocolVersion().toString());
context.setMethod(message.method().toString());
context.setEndpoint(message.uri());
context.setHttpVersion(message.protocolVersion().toString());
return fillMap(map, message.headers(), message.content());
try {
map.put("Request-URI", message.uri());
map.put("Method", message.method().toString());
map.put("HTTP-Version", message.protocolVersion().toString());
context.setMethod(message.method().toString());
context.setEndpoint(message.uri());
context.setHttpVersion(message.protocolVersion().toString());
return fillMap(map, message.headers(), message.content());
} finally {
Optional.ofNullable(message).ifPresent(ReferenceCounted::release);
}
});
}

Expand All @@ -154,11 +158,18 @@ private static Map<String, Object> convertResponse(@NonNull IReplayContexts.ITup
return makeSafeMap(context, () -> {
var map = new LinkedHashMap<String, Object>();
var message = HttpByteBufFormatter.parseHttpResponseFromBufs(byteToByteBufStream(data), true);
map.put("HTTP-Version", message.protocolVersion());
map.put(STATUS_CODE_KEY, message.status().code());
map.put("Reason-Phrase", message.status().reasonPhrase());
map.put(RESPONSE_TIME_MS_KEY, latency.toMillis());
return fillMap(map, message.headers(), message.content());
if (message == null) {
return Map.of("Exception", "Message couldn't be parsed as a full http message");
}
try {
map.put("HTTP-Version", message.protocolVersion());
map.put(STATUS_CODE_KEY, message.status().code());
map.put("Reason-Phrase", message.status().reasonPhrase());
map.put(RESPONSE_TIME_MS_KEY, latency.toMillis());
return fillMap(map, message.headers(), message.content());
} finally {
Optional.ofNullable(message).ifPresent(ReferenceCounted::release);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ private static void logStartOfWork(Object stringableKey, long newCount, Instant
var requestKey = ctx.getReplayerRequestKey();
logStartOfWork(requestKey, newCount, start, label);

log.atDebug().setMessage(()->"Scheduling request for " + ctx + " to run from [" + start + ", " + end +
" with an interval of " + interval + " for " + numPackets + " packets").log();
var sendResult = networkSendOrchestrator.scheduleRequest(requestKey, ctx, start, interval, packets);
return hookWorkFinishingUpdates(sendResult, originalStart, requestKey, label);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public RequestSenderOrchestrator(ClientConnectionPool clientConnectionPool) {
var finalTunneledResponse =
new StringTrackableCompletableFuture<AggregatedRawResponse>(new CompletableFuture<>(),
()->"waiting for final aggregated response");
log.atDebug().setMessage(()->"Scheduling request for "+requestKey+" at start time "+start).log();
// When a socket connection is attempted could be more precise.
// Ideally, we would match the relative timestamps of when connections were being initiated
// as well as the period between connection and the first bytes sent. However, this code is a
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.migrations.replay;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import lombok.Lombok;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.replay.datatypes.UniqueSourceRequestKey;
Expand All @@ -11,6 +12,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

@Slf4j
Expand All @@ -21,6 +23,7 @@ public class ResultsToLogsConsumer implements BiConsumer<SourceTargetCaptureTupl

private final Logger tupleLogger;
private final Logger progressLogger;
private final AtomicInteger tupleCounter;

public ResultsToLogsConsumer() {
this(null, null);
Expand All @@ -29,6 +32,7 @@ public ResultsToLogsConsumer() {
public ResultsToLogsConsumer(Logger tupleLogger, Logger progressLogger) {
this.tupleLogger = tupleLogger != null ? tupleLogger : LoggerFactory.getLogger(OUTPUT_TUPLE_JSON_LOGGER);
this.progressLogger = progressLogger != null ? progressLogger : makeTransactionSummaryLogger();
tupleCounter = new AtomicInteger();
}

// set this up so that the preamble prints out once, right after we have a logger
Expand Down Expand Up @@ -102,7 +106,8 @@ private Map<String, Object> toJSONObject(SourceTargetCaptureTuple tuple, ParsedH
* @param tuple the RequestResponseResponseTriple object to be converted into json and written to the stream.
*/
public void accept(SourceTargetCaptureTuple tuple, ParsedHttpMessagesAsDicts parsedMessages) {
progressLogger.atInfo().setMessage(()->toTransactionSummaryString(tuple, parsedMessages)).log();
final var index = tupleCounter.getAndIncrement();
progressLogger.atInfo().setMessage(()->toTransactionSummaryString(index, tuple, parsedMessages)).log();
tupleLogger.atInfo().setMessage(() -> {
try {
return PLAIN_MAPPER.writeValueAsString(toJSONObject(tuple, parsedMessages));
Expand All @@ -114,32 +119,46 @@ public void accept(SourceTargetCaptureTuple tuple, ParsedHttpMessagesAsDicts par

public static String getTransactionSummaryStringPreamble() {
return new StringJoiner(", ")
.add("SOURCE_STATUS_CODE")
.add("TARGET_STATUS_CODE")
.add("SOURCE_LATENCY")
.add("TARGET_LATENCY")
.add("#")
.add("REQUEST_ID")
.add("ORIGINAL_TIMESTAMP")
.add("SOURCE_STATUS_CODE/TARGET_STATUS_CODE")
.add("SOURCE_REQUEST_SIZE_BYTES/TARGET_REQUEST_SIZE_BYTES")
.add("SOURCE_RESPONSE_SIZE_BYTES/TARGET_RESPONSE_SIZE_BYTES")
.add("SOURCE_LATENCY_MS/TARGET_LATENCY_MS")
.toString();
}

public static String toTransactionSummaryString(SourceTargetCaptureTuple tuple, ParsedHttpMessagesAsDicts parsed) {
public static String toTransactionSummaryString(int index, SourceTargetCaptureTuple tuple, ParsedHttpMessagesAsDicts parsed) {
final String MISSING_STR = "-";
var s = parsed.sourceResponseOp;
var t = parsed.targetResponseOp;
return new StringJoiner(", ")
// SOURCE_STATUS_CODE
.add(s.map(r->""+r.get(ParsedHttpMessagesAsDicts.STATUS_CODE_KEY)).orElse(MISSING_STR))
// TARGET_STATUS_CODE
.add(t.map(r->""+r.get(ParsedHttpMessagesAsDicts.STATUS_CODE_KEY)).orElse(MISSING_STR))
// SOURCE_LATENCY
.add(s.map(r->""+r.get(ParsedHttpMessagesAsDicts.RESPONSE_TIME_MS_KEY)).orElse(MISSING_STR))
// TARGET_LATENCY
.add(t.map(r->""+r.get(ParsedHttpMessagesAsDicts.RESPONSE_TIME_MS_KEY)).orElse(MISSING_STR))
.add(Integer.toString(index))
// REQUEST_ID
.add(formatUniqueRequestKey(tuple.getRequestKey()))
// Original request timestamp
.add(Optional.ofNullable(tuple.sourcePair).map(sp->sp.requestData.getLastPacketTimestamp().toString())
.orElse(MISSING_STR))
// SOURCE/TARGET STATUS_CODE
.add(s.map(r->""+r.get(ParsedHttpMessagesAsDicts.STATUS_CODE_KEY)).orElse(MISSING_STR) + "/" +
t.map(r->""+r.get(ParsedHttpMessagesAsDicts.STATUS_CODE_KEY)).orElse(MISSING_STR))
// SOURCE/TARGET REQUEST_SIZE_BYTES
.add(Optional.ofNullable(tuple.sourcePair).map(sp->sp.requestData.stream().mapToInt(bArr->bArr.length)
.sum()+"")
.orElse(MISSING_STR) + "/" +
Optional.ofNullable(tuple.targetRequestData)
.map(transformedPackets->transformedPackets.streamUnretained()
.mapToInt(ByteBuf::readableBytes)
.sum()+"").orElse(MISSING_STR))
// SOURCE/TARGET RESPONSE_SIZE_BYTES
.add(Optional.ofNullable(tuple.sourcePair).flatMap(sp->Optional.ofNullable(sp.responseData))
.map(rd->rd.stream().mapToInt(bArr->bArr.length).sum()+"").orElse(MISSING_STR) + "/" +
Optional.ofNullable(tuple.targetResponseData)
.map(rd->rd.stream().mapToInt(bArr->bArr.length).sum()+"").orElse(MISSING_STR))
// SOURCE/TARGET LATENCY
.add(s.map(r->""+r.get(ParsedHttpMessagesAsDicts.RESPONSE_TIME_MS_KEY)).orElse(MISSING_STR) + "/" +
t.map(r->""+r.get(ParsedHttpMessagesAsDicts.RESPONSE_TIME_MS_KEY)).orElse(MISSING_STR))
.toString();
}

Expand Down
Loading

0 comments on commit d264a8c

Please sign in to comment.