diff --git a/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java index bbcbaa9ceb559..007357ee54a75 100644 --- a/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java @@ -230,6 +230,13 @@ protected synchronized void processAsyncFetch(List responses, List> asyncFetchStore = ConcurrentCollections.newConcurrentMap(); + private Set lastSeenEphemeralIds = Collections.emptySet(); @Inject public GatewayAllocator(RerouteService rerouteService, NodeClient client) { @@ -109,6 +117,7 @@ public void applyFailedShards(final RoutingAllocation allocation, final List newEphemeralIds = StreamSupport.stream(nodes.getDataNodes().spliterator(), false) + .map(node -> node.value.getEphemeralId()).collect(Collectors.toSet()); + // Invalidate the cache if a data node has been added to the cluster. This ensures that we do not cancel a recovery if a node + // drops out, we fetch the shard data, then some indexing happens and then the node rejoins the cluster again. There are other + // ways we could decide to cancel a recovery based on stale data (e.g. changing allocation filters or a primary failure) but + // making the wrong decision here is not catastrophic so we only need to cover the common case. + logger.trace(() -> new ParameterizedMessage( + "new nodes {} found, clearing primary async-fetch-store cache", Sets.difference(newEphemeralIds, lastSeenEphemeralIds))); + asyncFetchStore.values().forEach(fetch -> clearCacheForPrimary(fetch, allocation)); + // recalc to also (lazily) clear out old nodes. + this.lastSeenEphemeralIds = newEphemeralIds; + } + } + + private static void clearCacheForPrimary(AsyncShardFetch fetch, + RoutingAllocation allocation) { + ShardRouting primary = allocation.routingNodes().activePrimary(fetch.shardId); + if (primary != null) { + fetch.clearCacheForNode(primary.currentNodeId()); + } + } + + private boolean hasNewNodes(DiscoveryNodes nodes) { + for (ObjectObjectCursor node : nodes.getDataNodes()) { + if (lastSeenEphemeralIds.contains(node.value.getEphemeralId()) == false) { + return true; + } + } + return false; + } + class InternalAsyncFetch extends AsyncShardFetch { InternalAsyncFetch(Logger logger, String type, ShardId shardId, Lister, T> action) { diff --git a/server/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java index b8b29dc722dfe..b4ce705735b7c 100644 --- a/server/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java @@ -46,6 +46,7 @@ public class AsyncShardFetchTests extends ESTestCase { private final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT); private final Response response1 = new Response(node1); + private final Response response1_2 = new Response(node1); private final Throwable failure1 = new Throwable("simulated failure 1"); private final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT); @@ -274,6 +275,85 @@ public void testTwoNodesAddedInBetween() throws Exception { assertThat(fetchData.getData().get(node2), sameInstance(response2)); } + public void testClearCache() throws Exception { + DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build(); + test.addSimulation(node1.getId(), response1); + + // must work also with no data + test.clearCacheForNode(node1.getId()); + + // no fetched data, request still on going + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(false)); + assertThat(test.reroute.get(), equalTo(0)); + + test.fireSimulationAndWait(node1.getId()); + assertThat(test.reroute.get(), equalTo(1)); + + // verify we get back right data from node + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(true)); + assertThat(fetchData.getData().size(), equalTo(1)); + assertThat(fetchData.getData().get(node1), sameInstance(response1)); + + // second fetch gets same data + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(true)); + assertThat(fetchData.getData().size(), equalTo(1)); + assertThat(fetchData.getData().get(node1), sameInstance(response1)); + + test.clearCacheForNode(node1.getId()); + + // prepare next request + test.addSimulation(node1.getId(), response1_2); + + // no fetched data, new request on going + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(false)); + + test.fireSimulationAndWait(node1.getId()); + assertThat(test.reroute.get(), equalTo(2)); + + // verify we get new data back + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(true)); + assertThat(fetchData.getData().size(), equalTo(1)); + assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); + } + + public void testConcurrentRequestAndClearCache() throws Exception { + DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build(); + test.addSimulation(node1.getId(), response1); + + // no fetched data, request still on going + AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(false)); + assertThat(test.reroute.get(), equalTo(0)); + + // clear cache while request is still on going, before it is processed + test.clearCacheForNode(node1.getId()); + + test.fireSimulationAndWait(node1.getId()); + assertThat(test.reroute.get(), equalTo(1)); + + // prepare next request + test.addSimulation(node1.getId(), response1_2); + + // verify still no fetched data, request still on going + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(false)); + + test.fireSimulationAndWait(node1.getId()); + assertThat(test.reroute.get(), equalTo(2)); + + // verify we get new data back + fetchData = test.fetchData(nodes, emptySet()); + assertThat(fetchData.hasData(), equalTo(true)); + assertThat(fetchData.getData().size(), equalTo(1)); + assertThat(fetchData.getData().get(node1), sameInstance(response1_2)); + + } + static class TestFetch extends AsyncShardFetch { static class Entry { diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java new file mode 100644 index 0000000000000..60e2562f859a8 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -0,0 +1,110 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway; + +import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class ReplicaShardAllocatorIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class); + } + + public void testRecentPrimaryInformation() throws Exception { + String indexName = "test"; + String nodeWithPrimary = internalCluster().startNode(); + assertAcked( + client().admin().indices().prepareCreate(indexName) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 1.0f) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1ms"))); + String nodeWithReplica = internalCluster().startDataOnlyNode(); + Settings nodeWithReplicaSettings = internalCluster().dataPathSettings(nodeWithReplica); + ensureGreen(indexName); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(10, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + assertBusy(() -> { + SyncedFlushResponse syncedFlushResponse = client().admin().indices().prepareSyncedFlush(indexName).get(); + assertThat(syncedFlushResponse.successfulShards(), equalTo(2)); + }); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithReplica)); + if (randomBoolean()) { + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(10, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + } + CountDownLatch blockRecovery = new CountDownLatch(1); + CountDownLatch recoveryStarted = new CountDownLatch(1); + MockTransportService transportServiceOnPrimary + = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary); + transportServiceOnPrimary.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.FILES_INFO.equals(action)) { + recoveryStarted.countDown(); + try { + blockRecovery.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + connection.sendRequest(requestId, action, request, options); + }); + String newNode = internalCluster().startDataOnlyNode(); + recoveryStarted.await(); + // destroy sync_id after the recovery on the new node has started + client().admin().indices().prepareFlush(indexName).setForce(true).get(); + // AllocationService only calls GatewayAllocator if there are unassigned shards + assertAcked(client().admin().indices().prepareCreate("dummy-index").setWaitForActiveShards(0) + .setSettings(Settings.builder().put("index.routing.allocation.require.attr", "not-found"))); + internalCluster().startDataOnlyNode(nodeWithReplicaSettings); + // need to wait for events to ensure the reroute has happened since we perform it async when a new node joins. + client().admin().cluster().prepareHealth(indexName).setWaitForYellowStatus().setWaitForEvents(Priority.LANGUID).get(); + blockRecovery.countDown(); + ensureGreen(indexName); + assertThat(internalCluster().nodesInclude(indexName), hasItem(newNode)); + transportServiceOnPrimary.clearAllRules(); + } +}