Skip to content

Commit

Permalink
Use proper executor for failing requests when connection closes
Browse files Browse the repository at this point in the history
Now use the executor declared by the handler rather than generic, since using
generic is trappy wrt. deadlocks.

Closes elastic#109225
  • Loading branch information
henningandersen committed May 31, 2024
1 parent 84ca3be commit 29cd7ad
Showing 1 changed file with 24 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ protected void handleInternalSendException(
}
final var sendRequestException = new SendRequestTransportException(node, action, failure);
final var handler = contextToNotify.handler();
final var executor = getInternalSendExceptionExecutor(handler.executor());
final var executor = getAvoidStackOverflowExecutor(handler.executor());
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() {
Expand Down Expand Up @@ -1027,7 +1027,7 @@ public void onRejection(Exception e) {
});
}

private Executor getInternalSendExceptionExecutor(Executor handlerExecutor) {
private Executor getAvoidStackOverflowExecutor(Executor handlerExecutor) {
if (lifecycle.stoppedOrClosed()) {
// too late to try and dispatch anywhere else, let's just use the calling thread
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
Expand Down Expand Up @@ -1342,36 +1342,31 @@ public void onConnectionClosed(Transport.Connection connection) {
return;
}

// Callback that an exception happened, but on a different thread since we don't want handlers to worry about stack overflows.
final var executor = threadPool.generic();
assert executor.isShutdown() == false : "connections should all be closed before threadpool shuts down";
executor.execute(new AbstractRunnable() {
@Override
public void doRun() {
for (Transport.ResponseContext<?> holderToNotify : pruned) {
if (tracerLog.isTraceEnabled() && shouldTraceAction(holderToNotify.action())) {
tracerLog.trace(
"[{}][{}] pruning request because connection to node [{}] closed",
holderToNotify.requestId(),
holderToNotify.action(),
connection.getNode()
);
}
holderToNotify.handler().handleException(new NodeDisconnectedException(connection.getNode(), holderToNotify.action()));
}
}

@Override
public void onFailure(Exception e) {
assert false : e;
logger.warn(() -> "failed to notify response handler on connection close [" + connection + "]", e);
for (Transport.ResponseContext<?> holderToNotify : pruned) {
if (tracerLog.isTraceEnabled() && shouldTraceAction(holderToNotify.action())) {
tracerLog.trace(
"[{}][{}] pruning request because connection to node [{}] closed",
holderToNotify.requestId(),
holderToNotify.action(),
connection.getNode()
);
}
NodeDisconnectedException exception = new NodeDisconnectedException(connection.getNode(), holderToNotify.action());

@Override
public String toString() {
return "onConnectionClosed(" + connection.getNode() + ")";
TransportResponseHandler<?> handler = holderToNotify.handler();
// Callback that an exception happened, but on a different thread since we don't want handlers to worry about stack overflows.
final var executor = getAvoidStackOverflowExecutor(handler.executor());
if (executor == EsExecutors.DIRECT_EXECUTOR_SERVICE) {
handler.handleException(exception);
} else {
executor.execute(new ForkingResponseHandlerRunnable(handler, exception) {
@Override
protected void doRun() {
handler.handleException(exception);
}
});
}
});
}
}

final class TimeoutHandler implements Runnable {
Expand Down

0 comments on commit 29cd7ad

Please sign in to comment.