Skip to content

Commit

Permalink
[BUG] Serialization bugs can cause node drops
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Jan 12, 2022
1 parent c457272 commit 8238308
Show file tree
Hide file tree
Showing 2 changed files with 400 additions and 41 deletions.
111 changes: 73 additions & 38 deletions server/src/main/java/org/opensearch/transport/InboundHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.threadpool.ThreadPool;

import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -149,27 +150,13 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st
streamInput = namedWriteableStream(message.openOrGetStreamInput());
assertRemoteVersion(streamInput, header.getVersion());
if (header.isError()) {
handlerResponseError(streamInput, handler);
handlerResponseError(requestId, streamInput, handler);
} else {
handleResponse(remoteAddress, streamInput, handler);
}
// Check the entire message has been read
final int nextByte = streamInput.read();
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
if (nextByte != -1) {
throw new IllegalStateException(
"Message not fully read (response) for requestId ["
+ requestId
+ "], handler ["
+ handler
+ "], error ["
+ header.isError()
+ "]; resetting"
);
handleResponse(requestId, remoteAddress, streamInput, handler);
}
} else {
assert header.isError() == false;
handleResponse(remoteAddress, EMPTY_STREAM_INPUT, handler);
handleResponse(requestId, remoteAddress, EMPTY_STREAM_INPUT, handler);
}
}
}
Expand Down Expand Up @@ -246,32 +233,47 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
assertRemoteVersion(stream, header.getVersion());
final RequestHandlerRegistry<T> reg = requestHandlers.getHandler(action);
assert reg != null;
final T request = reg.newRequest(stream);
request.remoteAddress(new TransportAddress(channel.getRemoteAddress()));
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
final int nextByte = stream.read();
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
if (nextByte != -1) {

try {
final T request = reg.newRequest(stream);
request.remoteAddress(new TransportAddress(channel.getRemoteAddress()));

// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
final int nextByte = stream.read();
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
if (nextByte != -1) {
throw new IllegalStateException(
"Message not fully read (request) for requestId ["
+ requestId
+ "], action ["
+ action
+ "], available ["
+ stream.available()
+ "]; resetting"
);
}
final String executor = reg.getExecutor();
if (ThreadPool.Names.SAME.equals(executor)) {
try {
reg.processMessageReceived(request, transportChannel);
} catch (Exception e) {
sendErrorResponse(reg.getAction(), transportChannel, e);
}
} else {
threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel));
}
} catch (EOFException e) {
// Another favor of (de)serialization issues is when stream contains less bytes than
// the request handler needs to deserialize the payload.
throw new IllegalStateException(
"Message not fully read (request) for requestId ["
"Message fully read (request) but more data is expected for requestId ["
+ requestId
+ "], action ["
+ action
+ "], available ["
+ stream.available()
+ "]; resetting"
+ "]; resetting",
e
);
}
final String executor = reg.getExecutor();
if (ThreadPool.Names.SAME.equals(executor)) {
try {
reg.processMessageReceived(request, transportChannel);
} catch (Exception e) {
sendErrorResponse(reg.getAction(), transportChannel, e);
}
} else {
threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel));
}
}
} catch (Exception e) {
sendErrorResponse(action, transportChannel, e);
Expand All @@ -289,6 +291,7 @@ private static void sendErrorResponse(String actionName, TransportChannel transp
}

private <T extends TransportResponse> void handleResponse(
final long requestId,
InetSocketAddress remoteAddress,
final StreamInput stream,
final TransportResponseHandler<T> handler
Expand All @@ -297,6 +300,23 @@ private <T extends TransportResponse> void handleResponse(
try {
response = handler.read(stream);
response.remoteAddress(new TransportAddress(remoteAddress));

if (stream != EMPTY_STREAM_INPUT) {
// Check the entire message has been read
final int nextByte = stream.read();
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
if (nextByte != -1) {
throw new IllegalStateException(
"Message not fully read (response) for requestId ["
+ requestId
+ "], handler ["
+ handler
+ "], error ["
+ false
+ "]; resetting"
);
}
}
} catch (Exception e) {
final Exception serializationException = new TransportSerializationException(
"Failed to deserialize response from handler [" + handler + "]",
Expand All @@ -322,10 +342,25 @@ private <T extends TransportResponse> void doHandleResponse(TransportResponseHan
}
}

private void handlerResponseError(StreamInput stream, final TransportResponseHandler<?> handler) {
private void handlerResponseError(final long requestId, StreamInput stream, final TransportResponseHandler<?> handler) {
Exception error;
try {
error = stream.readException();

// Check the entire message has been read
final int nextByte = stream.read();
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
if (nextByte != -1) {
throw new IllegalStateException(
"Message not fully read (response) for requestId ["
+ requestId
+ "], handler ["
+ handler
+ "], error ["
+ true
+ "]; resetting"
);
}
} catch (Exception e) {
error = new TransportSerializationException(
"Failed to deserialize exception response from stream for handler [" + handler + "]",
Expand Down
Loading

0 comments on commit 8238308

Please sign in to comment.