From 03e1b2b005f0e2708196ee36d8451b5d56867689 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 11 Nov 2019 08:42:37 +0100 Subject: [PATCH 1/5] Fix Transport Stopped Exception When a node shutdowns, `TransportService` moves to stopped state and then closes connections. If a request is done in between, an exception was thrown that was not retried in replication actions. Now throw a wrapped `NodeClosedException` exception instead, which is correctly handled in replication action. Fixed other usages too. Relates #42612 --- .../elasticsearch/ElasticsearchException.java | 7 +- .../org/elasticsearch/ExceptionsHelper.java | 9 - .../replication/ReplicationOperation.java | 4 +- .../cluster/IndicesClusterStateService.java | 5 +- .../transport/LocalTransportException.java | 35 +++ .../transport/TransportService.java | 9 +- ...tReplicationActionRetryOnClosedNodeIT.java | 228 ++++++++++++++++++ .../test/transport/MockTransportService.java | 12 + 8 files changed, 290 insertions(+), 19 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/transport/LocalTransportException.java create mode 100644 server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index 821686e8894fc..3588074ff1cda 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1036,7 +1036,12 @@ private enum ElasticsearchExceptionHandle { org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException.class, org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException::new, 156, - Version.V_7_5_0); + Version.V_7_5_0), + LOCAL_TRANSPORT_EXCEPTION( + org.elasticsearch.transport.LocalTransportException.class, + org.elasticsearch.transport.LocalTransportException::new, + 157, + Version.V_7_6_0); final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java index 99154597e96f0..cc0c1b9c56a24 100644 --- a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.Index; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.transport.TransportException; import java.io.IOException; import java.io.PrintWriter; @@ -193,14 +192,6 @@ public static Throwable unwrap(Throwable t, Class... clazzes) { return null; } - public static boolean isTransportStoppedForAction(final Throwable t, final String action) { - final TransportException maybeTransport = - (TransportException) ExceptionsHelper.unwrap(t, TransportException.class); - return maybeTransport != null - && (maybeTransport.getMessage().equals("TransportService is closed stopped can't send request") - || maybeTransport.getMessage().equals("transport stopped, action: " + action)); - } - /** * Throws the specified exception. If null if specified then true is returned. */ diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index d328f06eb6895..8cea44911bbe7 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -213,9 +213,7 @@ public String toString() { private void onNoLongerPrimary(Exception failure) { final Throwable cause = ExceptionsHelper.unwrapCause(failure); - final boolean nodeIsClosing = - cause instanceof NodeClosedException - || ExceptionsHelper.isTransportStoppedForAction(cause, "internal:cluster/shard/failure"); + final boolean nodeIsClosing = cause instanceof NodeClosedException; final String message; if (nodeIsClosing) { message = String.format(Locale.ROOT, diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index d189dfd33395b..d6632596a8d4a 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -77,6 +77,7 @@ import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.search.SearchService; import org.elasticsearch.snapshots.SnapshotShardsService; @@ -334,8 +335,8 @@ public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) { ActionListener.wrap( r -> {}, e -> { - if (ExceptionsHelper.isTransportStoppedForAction(e, RetentionLeaseBackgroundSyncAction.ACTION_NAME + "[p]")) { - // we are likely shutting down + if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null ) { + // node shutting down return; } if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) { diff --git a/server/src/main/java/org/elasticsearch/transport/LocalTransportException.java b/server/src/main/java/org/elasticsearch/transport/LocalTransportException.java new file mode 100644 index 0000000000000..ebb3bb12179e4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/LocalTransportException.java @@ -0,0 +1,35 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.ElasticsearchWrapperException; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +public class LocalTransportException extends TransportException implements ElasticsearchWrapperException { + public LocalTransportException(String msg, Throwable cause) { + super(msg, cause); + } + + public LocalTransportException(StreamInput in) throws IOException { + super(in); + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 9689b70dd960d..ba6ed386131cf 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskManager; @@ -265,8 +266,8 @@ public void onFailure(Exception e) { } @Override public void doRun() { - // cf. ExceptionsHelper#isTransportStoppedForAction - TransportException ex = new TransportException("transport stopped, action: " + holderToNotify.action()); + TransportException ex = new LocalTransportException("transport stopped, action: " + holderToNotify.action(), + new NodeClosedException(localNode)); holderToNotify.handler().handleException(ex); } }); @@ -624,8 +625,8 @@ private void sendRequestInternal(final Transport.C * * Do not edit this exception message, it is currently relied upon in production code! */ - // TODO: make a dedicated exception for a stopped transport service? cf. ExceptionsHelper#isTransportStoppedForAction - throw new TransportException("TransportService is closed stopped can't send request"); + throw new LocalTransportException("TransportService is closed stopped can't send request", + new NodeClosedException(localNode)); } if (timeoutHandler != null) { assert options.timeout() != null; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java new file mode 100644 index 0000000000000..35c7c8ee39f4d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java @@ -0,0 +1,228 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.replication; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.NetworkPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportInterceptor; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; +import org.hamcrest.Matchers; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class TransportReplicationActionRetryOnClosedNodeIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(TestPlugin.class, MockTransportService.TestPlugin.class); + } + + public static class Request extends ReplicationRequest { + public Request(ShardId shardId) { + super(shardId); + } + + public Request(StreamInput in) throws IOException { + super(in); + } + + @Override + public String toString() { + return "test-request"; + } + } + + public static class Response extends ReplicationResponse { + public Response() { + } + + public Response(StreamInput in) throws IOException { + super(in); + } + } + + public static class TestAction extends TransportReplicationAction { + private static final String ACTION_NAME = "internal:test-replication-action"; + private static final ActionType TYPE = new ActionType<>(ACTION_NAME, Response::new); + + @Inject + public TestAction(Settings settings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, + ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, + indexNameExpressionResolver, Request::new, Request::new, ThreadPool.Names.GENERIC); + } + + @Override + protected Response newResponseInstance(StreamInput in) throws IOException { + return new Response(in); + } + + @Override + protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary, + ActionListener> listener) { + listener.onResponse(new PrimaryResult<>(shardRequest, new Response())); + } + + @Override + protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard replica) { + return new ReplicaResult(); + } + } + + public static class TestPlugin extends Plugin implements ActionPlugin, NetworkPlugin { + private CountDownLatch actionRunningLatch = new CountDownLatch(1); + private CountDownLatch actionWaitLatch = new CountDownLatch(1); + private volatile String testActionName; + + public TestPlugin() { + } + + @Override + public List> getActions() { + return List.of(new ActionHandler<>(TestAction.TYPE, TestAction.class)); + } + + @Override + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) { + return List.of(new TransportInterceptor() { + @Override + public AsyncSender interceptSender(AsyncSender sender) { + return new AsyncSender() { + @Override + public void sendRequest(Transport.Connection connection, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler handler) { + // only activated on primary + if (action.equals(testActionName)) { + actionRunningLatch.countDown(); + try { + actionWaitLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + sender.sendRequest(connection, action, request, options, handler); + } + }; + } + }); + } + } + + public void testRetryOnStoppedTransportService() throws Exception { + internalCluster().startMasterOnlyNodes(2); + String primary = internalCluster().startDataOnlyNode(); + assertAcked(prepareCreate("test") + .setSettings(Settings.builder() + .put(indexSettings()) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + )); + + String replica = internalCluster().startDataOnlyNode(); + String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + ensureGreen("test"); + + TestPlugin primaryTestPlugin = getTestPlugin(primary); + // this test only provoked an issue for the primary action, but for completeness, we pick the action randomly + primaryTestPlugin.testActionName = TestAction.ACTION_NAME + (randomBoolean() ? "[p]" : "[r]"); + logger.info("--> Test action {}, primary {}, replica {}", primaryTestPlugin.testActionName, primary, replica); + + AtomicReference response = new AtomicReference<>(); + CountDownLatch responseLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(1); + client(coordinator).execute(TestAction.TYPE, new Request(new ShardId(resolveIndex("test"), 0)), + ActionListener.wrap( + r -> { + assertTrue(response.compareAndSet(null, r)); + responseLatch.countDown(); + doneLatch.countDown(); + }, + e -> { + assertTrue(response.compareAndSet(null, e)); + responseLatch.countDown(); + doneLatch.countDown(); + })); + + + assertTrue(primaryTestPlugin.actionRunningLatch.await(10, TimeUnit.SECONDS)); + + MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, + primary); + // we pause node after TransportService has moved to stopped, but before closing connections, since if connections are closed + // we would not hit the transport service closed case. + primaryTransportService.addOnStopListener(() -> { + primaryTestPlugin.actionWaitLatch.countDown(); + try { + assertTrue(doneLatch.await(10, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + }); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); + + assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); + if (response.get() instanceof Exception) { + throw new AssertionError(response.get()); + } + } + + private TestPlugin getTestPlugin(String node) { + PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, node); + List testPlugins = pluginsService.filterPlugins(TestPlugin.class); + assertThat(testPlugins, Matchers.hasSize(1)); + return testPlugins.get(0); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index a31ee921b5529..f178dcc0946c3 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -92,6 +92,8 @@ public final class MockTransportService extends TransportService { private final Map> openConnections = new HashMap<>(); + private final List onStopListeners = new CopyOnWriteArrayList<>(); + public static class TestPlugin extends Plugin { @Override public List> getSettings() { @@ -527,6 +529,16 @@ public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfi })); } + public void addOnStopListener(Runnable listener) { + onStopListeners.add(listener); + } + + @Override + protected void doStop() { + onStopListeners.forEach(Runnable::run); + super.doStop(); + } + @Override protected void doClose() throws IOException { super.doClose(); From 62af9455b4dde3db8448fd4a506974c4585bf15b Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 11 Nov 2019 10:00:25 +0100 Subject: [PATCH 2/5] Removed LocalTransportException and fixes Turns out new exception was unnecessary. The `Transport is stopped` messages were hardcoded in a few more places, fixed those too. --- .../elasticsearch/ElasticsearchException.java | 7 +--- .../transport/LocalTransportException.java | 35 ------------------- .../transport/TransportService.java | 10 +++--- .../ReplicationOperationTests.java | 8 ++--- .../IndicesClusterStateServiceTests.java | 11 ++++-- .../xpack/ccr/action/ShardFollowNodeTask.java | 3 +- 6 files changed, 16 insertions(+), 58 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/transport/LocalTransportException.java diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index 3588074ff1cda..821686e8894fc 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1036,12 +1036,7 @@ private enum ElasticsearchExceptionHandle { org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException.class, org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException::new, 156, - Version.V_7_5_0), - LOCAL_TRANSPORT_EXCEPTION( - org.elasticsearch.transport.LocalTransportException.class, - org.elasticsearch.transport.LocalTransportException::new, - 157, - Version.V_7_6_0); + Version.V_7_5_0); final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/transport/LocalTransportException.java b/server/src/main/java/org/elasticsearch/transport/LocalTransportException.java deleted file mode 100644 index ebb3bb12179e4..0000000000000 --- a/server/src/main/java/org/elasticsearch/transport/LocalTransportException.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport; - -import org.elasticsearch.ElasticsearchWrapperException; -import org.elasticsearch.common.io.stream.StreamInput; - -import java.io.IOException; - -public class LocalTransportException extends TransportException implements ElasticsearchWrapperException { - public LocalTransportException(String msg, Throwable cause) { - super(msg, cause); - } - - public LocalTransportException(StreamInput in) throws IOException { - super(in); - } -} diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index ba6ed386131cf..081f96c6f8c82 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -266,8 +266,9 @@ public void onFailure(Exception e) { } @Override public void doRun() { - TransportException ex = new LocalTransportException("transport stopped, action: " + holderToNotify.action(), - new NodeClosedException(localNode)); + TransportException ex = + new SendRequestTransportException(localNode, holderToNotify.action(), new NodeClosedException(localNode)); + holderToNotify.handler().handleException(ex); } }); @@ -622,11 +623,8 @@ private void sendRequestInternal(final Transport.C /* * If we are not started the exception handling will remove the request holder again and calls the handler to notify the * caller. It will only notify if toStop hasn't done the work yet. - * - * Do not edit this exception message, it is currently relied upon in production code! */ - throw new LocalTransportException("TransportService is closed stopped can't send request", - new NodeClosedException(localNode)); + throw new NodeClosedException(localNode); } if (timeoutHandler != null) { assert options.timeout() != null; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 9f86d190a644a..3038153a3d537 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -44,7 +44,6 @@ import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.SendRequestTransportException; -import org.elasticsearch.transport.TransportException; import java.util.ArrayList; import java.util.Collections; @@ -205,12 +204,9 @@ public void testNoLongerPrimary() throws Exception { if (randomBoolean()) { shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT)); } else if (randomBoolean()) { + DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT); shardActionFailure = new SendRequestTransportException( - new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT), ShardStateAction.SHARD_FAILED_ACTION_NAME, - new TransportException("TransportService is closed stopped can't send request")); - } else if (randomBoolean()) { - shardActionFailure = new TransportException( - "transport stopped, action: " + ShardStateAction.SHARD_FAILED_ACTION_NAME); + node, ShardStateAction.SHARD_FAILED_ACTION_NAME, new NodeClosedException(node)); } else { shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead"); } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceTests.java index 3189fa25bc83b..86c573ca3b803 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; @@ -40,10 +41,12 @@ import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; import org.junit.Before; @@ -118,9 +121,11 @@ protected void doExecute(Task task, RetentionLeaseBackgroundSyncAction.Request r new AlreadyClosedException("closed"), new IndexShardClosedException(indexShard.shardId()), new TransportException(randomFrom( - "failed", - "TransportService is closed stopped can't send request", - "transport stopped, action: indices:admin/seq_no/retention_lease_background_sync[p]")), + "failed")), + new SendRequestTransportException(null, randomFrom( + "some-action", + "indices:admin/seq_no/retention_lease_background_sync[p]" + ), new NodeClosedException((DiscoveryNode) null)), new RuntimeException("failed")); listener.onFailure(e); if (e.getMessage().equals("failed")) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 4ad0fb1dfd0d6..1ec1ec6b1c173 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -546,8 +546,7 @@ static boolean shouldRetry(String remoteCluster, Exception e) { actual instanceof IndexClosedException || // If follow index is closed actual instanceof ConnectTransportException || actual instanceof NodeClosedException || - actual instanceof NoSuchRemoteClusterException || - (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")); + actual instanceof NoSuchRemoteClusterException; } // These methods are protected for testing purposes: From 36416148f82680f0eb1f8d76584d2dfc6b038001 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 11 Nov 2019 10:18:47 +0100 Subject: [PATCH 3/5] Checkstyle fixes --- ...ransportReplicationActionRetryOnClosedNodeIT.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java index 35c7c8ee39f4d..c0a070539f2fb 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java @@ -100,8 +100,9 @@ public static class TestAction extends TransportReplicationAction TYPE = new ActionType<>(ACTION_NAME, Response::new); @Inject - public TestAction(Settings settings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, - ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + public TestAction(Settings settings, TransportService transportService, ClusterService clusterService, + IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, indexNameExpressionResolver, Request::new, Request::new, ThreadPool.Names.GENERIC); } @@ -137,13 +138,16 @@ public TestPlugin() { } @Override - public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) { + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext) { return List.of(new TransportInterceptor() { @Override public AsyncSender interceptSender(AsyncSender sender) { return new AsyncSender() { @Override - public void sendRequest(Transport.Connection connection, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler handler) { + public void sendRequest(Transport.Connection connection, String action, + TransportRequest request, TransportRequestOptions options, + TransportResponseHandler handler) { // only activated on primary if (action.equals(testActionName)) { actionRunningLatch.countDown(); From 4e2cc6dd477ce67f16e3d417807b24aae24b4f42 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 11 Nov 2019 22:29:49 +0100 Subject: [PATCH 4/5] Armin comments --- .../cluster/IndicesClusterStateService.java | 2 +- ...tReplicationActionRetryOnClosedNodeIT.java | 19 +++++-------------- .../IndicesClusterStateServiceTests.java | 3 +-- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index d6632596a8d4a..92e9604651d0e 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -335,7 +335,7 @@ public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) { ActionListener.wrap( r -> {}, e -> { - if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null ) { + if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { // node shutting down return; } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java index c0a070539f2fb..58564fee58c06 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java @@ -185,21 +185,12 @@ public void testRetryOnStoppedTransportService() throws Exception { logger.info("--> Test action {}, primary {}, replica {}", primaryTestPlugin.testActionName, primary, replica); AtomicReference response = new AtomicReference<>(); - CountDownLatch responseLatch = new CountDownLatch(1); CountDownLatch doneLatch = new CountDownLatch(1); client(coordinator).execute(TestAction.TYPE, new Request(new ShardId(resolveIndex("test"), 0)), - ActionListener.wrap( - r -> { - assertTrue(response.compareAndSet(null, r)); - responseLatch.countDown(); - doneLatch.countDown(); - }, - e -> { - assertTrue(response.compareAndSet(null, e)); - responseLatch.countDown(); - doneLatch.countDown(); - })); - + ActionListener.runAfter(ActionListener.wrap( + r -> assertTrue(response.compareAndSet(null, r)), + e -> assertTrue(response.compareAndSet(null, e))), + doneLatch::countDown)); assertTrue(primaryTestPlugin.actionRunningLatch.await(10, TimeUnit.SECONDS)); @@ -217,7 +208,7 @@ public void testRetryOnStoppedTransportService() throws Exception { }); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); + assertTrue(doneLatch.await(10, TimeUnit.SECONDS)); if (response.get() instanceof Exception) { throw new AssertionError(response.get()); } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceTests.java index 86c573ca3b803..6a35f268c987e 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceTests.java @@ -120,8 +120,7 @@ protected void doExecute(Task task, RetentionLeaseBackgroundSyncAction.Request r final Exception e = randomFrom( new AlreadyClosedException("closed"), new IndexShardClosedException(indexShard.shardId()), - new TransportException(randomFrom( - "failed")), + new TransportException("failed"), new SendRequestTransportException(null, randomFrom( "some-action", "indices:admin/seq_no/retention_lease_background_sync[p]" From c6c741f5251618cefaa7a2608533a226d4df9759 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Tue, 12 Nov 2019 17:55:59 +0100 Subject: [PATCH 5/5] Fix node in exception. --- .../java/org/elasticsearch/transport/TransportService.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 081f96c6f8c82..d2ec9e815a9fb 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -266,9 +266,8 @@ public void onFailure(Exception e) { } @Override public void doRun() { - TransportException ex = - new SendRequestTransportException(localNode, holderToNotify.action(), new NodeClosedException(localNode)); - + TransportException ex = new SendRequestTransportException(holderToNotify.connection().getNode(), + holderToNotify.action(), new NodeClosedException(localNode)); holderToNotify.handler().handleException(ex); } });