Skip to content

Commit

Permalink
fallback setting + tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
henningandersen committed May 31, 2024
1 parent d279bfa commit 7747639
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
Expand Down Expand Up @@ -90,6 +91,17 @@ public class TransportService extends AbstractLifecycleComponent
public static final String DIRECT_RESPONSE_PROFILE = ".direct";
public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake";

/**
* Undocumented on purpose, may be removed at any time. Only use this if instructed to do so, can have other unintended consequences
* including deadlocks.
*/
public static final Setting<Boolean> ENABLE_STACK_OVERFLOW_AVOIDANCE = Setting.boolSetting(
"transport.enable_stack_protection",
false,
Setting.Property.NodeScope,
Setting.Property.Deprecated
);

private final AtomicBoolean handleIncomingRequests = new AtomicBoolean();
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
protected final Transport transport;
Expand All @@ -105,6 +117,8 @@ public class TransportService extends AbstractLifecycleComponent

private final PendingDirectHandlers pendingDirectHandlers = new PendingDirectHandlers();

private final boolean enableStackOverflowAvoidance;

// An LRU (don't really care about concurrency here) that holds the latest timed out requests so if they
// do show up, we can print more descriptive information about them
final Map<Long, TimeoutInfoHolder> timeoutInfoHandlers = Collections.synchronizedMap(new LinkedHashMap<>(100, .75F, true) {
Expand Down Expand Up @@ -281,6 +295,7 @@ public TransportService(
this.interceptor = transportInterceptor;
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
this.enableStackOverflowAvoidance = ENABLE_STACK_OVERFLOW_AVOIDANCE.get(settings);
remoteClusterService = new RemoteClusterService(settings, this);
responseHandlers = transport.getResponseHandlers();
if (clusterSettings != null) {
Expand Down Expand Up @@ -1358,7 +1373,27 @@ public void onConnectionClosed(Transport.Connection connection) {
// to handle that themselves instead.
final var executor = handler.executor();
if (executor == EsExecutors.DIRECT_EXECUTOR_SERVICE) {
handler.handleException(exception);
if (enableStackOverflowAvoidance == false) {
handler.handleException(exception);
} else {
threadPool.generic().submit(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
handler.handleException(exception);
}

@Override
public void onFailure(Exception e) {
assert false : e;
logger.warn(() -> "failed to notify response handler on connection close [" + connection + "]", e);
}

@Override
public String toString() {
return "onConnectionClosed(" + connection.getNode() + ")";
}
});
}
} else {
executor.execute(new ForkingResponseHandlerRunnable(handler, exception) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -45,6 +46,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;

public class TransportServiceLifecycleTests extends ESTestCase {
Expand Down Expand Up @@ -217,6 +219,56 @@ public void testInternalSendExceptionCompletesHandlerOnCallingThreadIfTransportS
assertThat(getSendRequestException(future, NodeClosedException.class).getMessage(), startsWith("node closed"));
}

public void testOnConnectionClosedUsesHandlerExecutor() {
String executorName = randomFrom(TestNode.EXECUTOR_NAMES);
String expectedExecutor = (executorName.equals(Executors.DIRECT) ? null : executorName);
boolean withSetting = randomBoolean();
Settings settings = withSetting
? Settings.builder().put(TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE.getKey(), false).build()
: Settings.EMPTY;
onConnectionClosedUsesHandlerExecutor(settings, executorName, expectedExecutor);
if (withSetting) {
assertWarnings(
"[transport.enable_stack_protection] setting was deprecated in Elasticsearch and will be removed in a future release."
);
}
}

public void testOnConnetionCloseStackOverflowAvoidance() {
onConnectionClosedUsesHandlerExecutor(
Settings.builder().put(TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE.getKey(), true).build(),
Executors.DIRECT,
ThreadPool.Names.GENERIC
);
assertWarnings(
"[transport.enable_stack_protection] setting was deprecated in Elasticsearch and will be removed in a future release."
);
}

private void onConnectionClosedUsesHandlerExecutor(Settings settings, String executorName, String expectedExecutor) {
try (var nodeA = new TestNode("node-A", settings)) {
final var testThread = Thread.currentThread();
final var future = new PlainActionFuture<TransportResponse.Empty>();
Executor executor = nodeA.getExecutor(executorName);
Transport.Connection connection = nodeA.getDevNullConnection();
nodeA.transportService.sendRequest(
connection,
TestNode.randomActionName(random()),
new TransportRequest.Empty(),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(ActionListener.assertOnce(future.delegateResponse((l, e) -> {
assertThat(EsExecutors.executorName(Thread.currentThread()), equalTo(expectedExecutor));
if (expectedExecutor == null) {
assertSame(testThread, Thread.currentThread());
}
l.onFailure(e);
})), unusedReader(), executor)
);
nodeA.transportService.onConnectionClosed(connection);
expectThrows(ExecutionException.class, NodeDisconnectedException.class, () -> future.get(10, TimeUnit.SECONDS));
}
}

private static <T> Writeable.Reader<T> unusedReader() {
return in -> fail(null, "should not be used");
}
Expand Down Expand Up @@ -251,6 +303,10 @@ private static class TestNode implements Releasable {
final TransportService transportService;

TestNode(String nodeName) {
this(nodeName, Settings.EMPTY);
}

TestNode(String nodeName, Settings settings) {
threadPool = new TestThreadPool(
nodeName,
new ScalingExecutorBuilder(Executors.SCALING_DROP_ON_SHUTDOWN, 3, 3, TimeValue.timeValueSeconds(60), false),
Expand Down Expand Up @@ -281,7 +337,7 @@ public ExecutorService executor(String name) {
};
final var tcpTransport = MockTransportService.newMockTransport(Settings.EMPTY, TransportVersion.current(), threadPool);
transportService = new TransportService(
Settings.EMPTY,
settings,
tcpTransport,
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
Expand Down Expand Up @@ -351,6 +407,25 @@ public TransportVersion getTransportVersion() {
}
};
}

Transport.Connection getDevNullConnection() {
return new CloseableConnection() {
@Override
public DiscoveryNode getNode() {
return transportService.getLocalNode();
}

@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) {
// going nowhere
}

@Override
public TransportVersion getTransportVersion() {
return TransportVersion.current();
}
};
}
}

}

0 comments on commit 7747639

Please sign in to comment.