Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Transport Stopped Exception #48930

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions server/src/main/java/org/elasticsearch/ExceptionsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.Index;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TransportException;

import java.io.IOException;
import java.io.PrintWriter;
Expand Down Expand Up @@ -193,14 +192,6 @@ public static Throwable unwrap(Throwable t, Class<?>... clazzes) {
return null;
}

public static boolean isTransportStoppedForAction(final Throwable t, final String action) {
final TransportException maybeTransport =
(TransportException) ExceptionsHelper.unwrap(t, TransportException.class);
return maybeTransport != null
&& (maybeTransport.getMessage().equals("TransportService is closed stopped can't send request")
|| maybeTransport.getMessage().equals("transport stopped, action: " + action));
}

/**
* Throws the specified exception. If null if specified then <code>true</code> is returned.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,7 @@ public String toString() {

private void onNoLongerPrimary(Exception failure) {
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
final boolean nodeIsClosing =
cause instanceof NodeClosedException
|| ExceptionsHelper.isTransportStoppedForAction(cause, "internal:cluster/shard/failure");
final boolean nodeIsClosing = cause instanceof NodeClosedException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use the ExceptionsHelper.unwrap(e, NodeClosedException.class) != null pattern here too to be a little more resilient to future changes (and present unexpected paths that get us here ...?)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this too, but ended up keeping this as is, since the unwrap's are different in that unwrapCause only unwraps ElasticsearchWrapperException, whereas unwrap unwraps all causes. It could be important in case we end up unwrapping a deep exception from another host? Do you think we should try to follow your suggestion anyway?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah let's not do that. I def. doesn't look impossible for that exception from another node making it here (since it's not trivial to tell it the suggestion is pointless in the first place :)).

final String message;
if (nodeIsClosing) {
message = String.format(Locale.ROOT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.snapshots.SnapshotShardsService;
Expand Down Expand Up @@ -334,8 +335,8 @@ public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) {
ActionListener.wrap(
r -> {},
e -> {
if (ExceptionsHelper.isTransportStoppedForAction(e, RetentionLeaseBackgroundSyncAction.ACTION_NAME + "[p]")) {
// we are likely shutting down
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
// node shutting down
return;
}
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager;
Expand Down Expand Up @@ -265,8 +266,8 @@ public void onFailure(Exception e) {
}
@Override
public void doRun() {
// cf. ExceptionsHelper#isTransportStoppedForAction
TransportException ex = new TransportException("transport stopped, action: " + holderToNotify.action());
TransportException ex = new SendRequestTransportException(holderToNotify.connection().getNode(),
holderToNotify.action(), new NodeClosedException(localNode));
holderToNotify.handler().handleException(ex);
}
});
Expand Down Expand Up @@ -621,11 +622,8 @@ private <T extends TransportResponse> void sendRequestInternal(final Transport.C
/*
* If we are not started the exception handling will remove the request holder again and calls the handler to notify the
* caller. It will only notify if toStop hasn't done the work yet.
*
* Do not edit this exception message, it is currently relied upon in production code!
*/
// TODO: make a dedicated exception for a stopped transport service? cf. ExceptionsHelper#isTransportStoppedForAction
throw new TransportException("TransportService is closed stopped can't send request");
throw new NodeClosedException(localNode);
}
if (timeoutHandler != null) {
assert options.timeout() != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.TransportException;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -205,12 +204,9 @@ public void testNoLongerPrimary() throws Exception {
if (randomBoolean()) {
shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT));
} else if (randomBoolean()) {
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
shardActionFailure = new SendRequestTransportException(
new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT), ShardStateAction.SHARD_FAILED_ACTION_NAME,
new TransportException("TransportService is closed stopped can't send request"));
} else if (randomBoolean()) {
shardActionFailure = new TransportException(
"transport stopped, action: " + ShardStateAction.SHARD_FAILED_ACTION_NAME);
node, ShardStateAction.SHARD_FAILED_ACTION_NAME, new NodeClosedException(node));
} else {
shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.support.replication;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;


@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class TransportReplicationActionRetryOnClosedNodeIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(TestPlugin.class, MockTransportService.TestPlugin.class);
}

public static class Request extends ReplicationRequest<Request> {
public Request(ShardId shardId) {
super(shardId);
}

public Request(StreamInput in) throws IOException {
super(in);
}

@Override
public String toString() {
return "test-request";
}
}

public static class Response extends ReplicationResponse {
public Response() {
}

public Response(StreamInput in) throws IOException {
super(in);
}
}

public static class TestAction extends TransportReplicationAction<Request, Request, Response> {
private static final String ACTION_NAME = "internal:test-replication-action";
private static final ActionType<Response> TYPE = new ActionType<>(ACTION_NAME, Response::new);

@Inject
public TestAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, Request::new, Request::new, ThreadPool.Names.GENERIC);
}

@Override
protected Response newResponseInstance(StreamInput in) throws IOException {
return new Response(in);
}

@Override
protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
ActionListener<PrimaryResult<Request, Response>> listener) {
listener.onResponse(new PrimaryResult<>(shardRequest, new Response()));
}

@Override
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard replica) {
return new ReplicaResult();
}
}

public static class TestPlugin extends Plugin implements ActionPlugin, NetworkPlugin {
private CountDownLatch actionRunningLatch = new CountDownLatch(1);
private CountDownLatch actionWaitLatch = new CountDownLatch(1);
private volatile String testActionName;

public TestPlugin() {
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(new ActionHandler<>(TestAction.TYPE, TestAction.class));
}

@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
ThreadContext threadContext) {
return List.of(new TransportInterceptor() {
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action,
TransportRequest request, TransportRequestOptions options,
TransportResponseHandler<T> handler) {
// only activated on primary
if (action.equals(testActionName)) {
actionRunningLatch.countDown();
try {
actionWaitLatch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
sender.sendRequest(connection, action, request, options, handler);
}
};
}
});
}
}

public void testRetryOnStoppedTransportService() throws Exception {
internalCluster().startMasterOnlyNodes(2);
String primary = internalCluster().startDataOnlyNode();
assertAcked(prepareCreate("test")
.setSettings(Settings.builder()
.put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
));

String replica = internalCluster().startDataOnlyNode();
String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
ensureGreen("test");

TestPlugin primaryTestPlugin = getTestPlugin(primary);
// this test only provoked an issue for the primary action, but for completeness, we pick the action randomly
primaryTestPlugin.testActionName = TestAction.ACTION_NAME + (randomBoolean() ? "[p]" : "[r]");
logger.info("--> Test action {}, primary {}, replica {}", primaryTestPlugin.testActionName, primary, replica);

AtomicReference<Object> response = new AtomicReference<>();
CountDownLatch doneLatch = new CountDownLatch(1);
client(coordinator).execute(TestAction.TYPE, new Request(new ShardId(resolveIndex("test"), 0)),
ActionListener.runAfter(ActionListener.wrap(
r -> assertTrue(response.compareAndSet(null, r)),
e -> assertTrue(response.compareAndSet(null, e))),
doneLatch::countDown));

assertTrue(primaryTestPlugin.actionRunningLatch.await(10, TimeUnit.SECONDS));

MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
primary);
// we pause node after TransportService has moved to stopped, but before closing connections, since if connections are closed
// we would not hit the transport service closed case.
primaryTransportService.addOnStopListener(() -> {
primaryTestPlugin.actionWaitLatch.countDown();
try {
assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new AssertionError(e);
}
});
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));

assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
if (response.get() instanceof Exception) {
throw new AssertionError(response.get());
}
}

private TestPlugin getTestPlugin(String node) {
PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, node);
List<TestPlugin> testPlugins = pluginsService.filterPlugins(TestPlugin.class);
assertThat(testPlugins, Matchers.hasSize(1));
return testPlugins.get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
Expand All @@ -40,10 +41,12 @@
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
Expand Down Expand Up @@ -117,10 +120,11 @@ protected void doExecute(Task task, RetentionLeaseBackgroundSyncAction.Request r
final Exception e = randomFrom(
new AlreadyClosedException("closed"),
new IndexShardClosedException(indexShard.shardId()),
new TransportException(randomFrom(
"failed",
"TransportService is closed stopped can't send request",
"transport stopped, action: indices:admin/seq_no/retention_lease_background_sync[p]")),
new TransportException("failed"),
new SendRequestTransportException(null, randomFrom(
"some-action",
"indices:admin/seq_no/retention_lease_background_sync[p]"
), new NodeClosedException((DiscoveryNode) null)),
new RuntimeException("failed"));
listener.onFailure(e);
if (e.getMessage().equals("failed")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public final class MockTransportService extends TransportService {

private final Map<DiscoveryNode, List<Transport.Connection>> openConnections = new HashMap<>();

private final List<Runnable> onStopListeners = new CopyOnWriteArrayList<>();

public static class TestPlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
Expand Down Expand Up @@ -527,6 +529,16 @@ public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfi
}));
}

public void addOnStopListener(Runnable listener) {
onStopListeners.add(listener);
}

@Override
protected void doStop() {
onStopListeners.forEach(Runnable::run);
super.doStop();
}

@Override
protected void doClose() throws IOException {
super.doClose();
Expand Down
Loading