From c95e51cce645b0cc79f44cd94bb09d023269bcc4 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 16 Dec 2019 15:03:31 +0100 Subject: [PATCH] Fix Index Deletion during Snapshot Finalization (#50202) (#50228) * Fix Index Deletion during Snapshot Finalization (#50202) With #45689 making it so that index metadata is written after all shards have been snapshotted we can't delete indices that are part of the upcoming snapshot finalization any longer and it is not sufficient to check if all shards of an index have been snapshotted before deciding that it is safe to delete it. This change forbids deleting any index that is in the process of being snapshot to avoid issues during snapshot finalization. Relates #50200 (doesn't fully fix yet because we're not fixing the `partial=true` snapshot case here --- .../snapshots/SnapshotsService.java | 19 +- .../snapshots/SnapshotResiliencyTests.java | 172 +++++++++++++----- 2 files changed, 129 insertions(+), 62 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 24a13dc270b7e..bd1d0e7dafb5d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1468,21 +1468,10 @@ public static Set snapshottingIndices(final ClusterState currentState, fi final Set indices = new HashSet<>(); for (final SnapshotsInProgress.Entry entry : snapshots.entries()) { if (entry.partial() == false) { - if (entry.state() == State.INIT) { - for (IndexId index : entry.indices()) { - IndexMetaData indexMetaData = currentState.metaData().index(index.getName()); - if (indexMetaData != null && indicesToCheck.contains(indexMetaData.getIndex())) { - indices.add(indexMetaData.getIndex()); - } - } - } else { - for (ObjectObjectCursor shard : entry.shards()) { - Index index = shard.key.getIndex(); - if (indicesToCheck.contains(index) - && shard.value.state().completed() == false - && currentState.getMetaData().index(index) != null) { - indices.add(index); - } + for (IndexId index : entry.indices()) { + IndexMetaData indexMetaData = currentState.metaData().index(index.getName()); + if (indexMetaData != null && indicesToCheck.contains(indexMetaData.getIndex())) { + indices.add(indexMetaData.getIndex()); } } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 45c66caa7900f..5e483981b14fe 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -26,6 +26,10 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RequestValidators; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction; @@ -75,6 +79,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -156,6 +161,7 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.fs.FsRepository; @@ -233,6 +239,15 @@ public void createServices() { @After public void verifyReposThenStopServices() { try { + clearDisruptionsAndAwaitSync(); + + final StepListener cleanupResponse = new StepListener<>(); + client().admin().cluster().cleanupRepository( + new CleanupRepositoryRequest("repo"), cleanupResponse); + final AtomicBoolean cleanedUp = new AtomicBoolean(false); + continueOrDie(cleanupResponse, r -> cleanedUp.set(true)); + + runUntil(cleanedUp::get, TimeUnit.MINUTES.toMillis(1L)); if (blobStoreContext != null) { blobStoreContext.forceConsistent(); } @@ -258,8 +273,8 @@ public void testSuccessfulSnapshotAndRestore() { final StepListener createSnapshotResponseListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { - final Runnable afterIndexing = () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> { + final Runnable afterIndexing = () -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) .setWaitForCompletion(true).execute(createSnapshotResponseListener); if (documents == 0) { afterIndexing.run(); @@ -269,7 +284,7 @@ public void testSuccessfulSnapshotAndRestore() { bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i))); } final StepListener bulkResponseStepListener = new StepListener<>(); - masterNode.client.bulk(bulkRequest, bulkResponseStepListener); + client().bulk(bulkRequest, bulkResponseStepListener); continueOrDie(bulkResponseStepListener, bulkResponse -> { assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); assertEquals(documents, bulkResponse.getItems().length); @@ -281,16 +296,16 @@ public void testSuccessfulSnapshotAndRestore() { final StepListener deleteIndexListener = new StepListener<>(); continueOrDie(createSnapshotResponseListener, - createSnapshotResponse -> masterNode.client.admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener)); + createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener)); final StepListener restoreSnapshotResponseListener = new StepListener<>(); - continueOrDie(deleteIndexListener, ignored -> masterNode.client.admin().cluster().restoreSnapshot( + continueOrDie(deleteIndexListener, ignored -> client().admin().cluster().restoreSnapshot( new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), restoreSnapshotResponseListener)); final StepListener searchResponseListener = new StepListener<>(); continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> { assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); - masterNode.client.search( + client().search( new SearchRequest(index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)), searchResponseListener); }); @@ -307,7 +322,7 @@ public void testSuccessfulSnapshotAndRestore() { SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); @@ -319,33 +334,34 @@ public void testSuccessfulSnapshotAndRestore() { public void testSnapshotWithNodeDisconnects() { final int dataNodes = randomIntBetween(2, 10); - setupTestCluster(randomFrom(1, 3, 5), dataNodes); + final int masterNodes = randomFrom(1, 3, 5); + setupTestCluster(masterNodes, dataNodes); String repoName = "repo"; String snapshotName = "snapshot"; final String index = "test"; final int shards = randomIntBetween(1, 10); - TestClusterNodes.TestClusterNode masterNode = - testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> { for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) { scheduleNow(this::disconnectRandomDataNode); } if (randomBoolean()) { scheduleNow(() -> testClusterNodes.clearNetworkDisruptions()); } - masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener); + testClusterNodes.randomMasterNodeSafe().client.admin().cluster() + .prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener); }); continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> { for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) { scheduleNow(this::disconnectOrRestartDataNode); } - final boolean disconnectedMaster = randomBoolean(); + // Only disconnect master if we have more than a single master and can simulate a failover + final boolean disconnectedMaster = randomBoolean() && masterNodes > 1; if (disconnectedMaster) { scheduleNow(this::disconnectOrRestartMasterNode); } @@ -368,7 +384,7 @@ public void testSnapshotWithNodeDisconnects() { SnapshotsInProgress finalSnapshotsInProgress = randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE); assertThat(finalSnapshotsInProgress.entries(), empty()); final Repository repository = randomMaster.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); } @@ -385,18 +401,18 @@ public void testConcurrentSnapshotCreateAndDelete() { final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), - createIndexResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) .execute(createSnapshotResponseStepListener)); final StepListener deleteSnapshotStepListener = new StepListener<>(); - continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().deleteSnapshot( + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> client().admin().cluster().deleteSnapshot( new DeleteSnapshotRequest(repoName, snapshotName), deleteSnapshotStepListener)); final StepListener createAnotherSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster() + continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> client().admin().cluster() .prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createAnotherSnapshotResponseStepListener)); continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse -> assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS)); @@ -407,7 +423,7 @@ public void testConcurrentSnapshotCreateAndDelete() { SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); @@ -417,6 +433,50 @@ public void testConcurrentSnapshotCreateAndDelete() { assertEquals(0, snapshotInfo.failedShards()); } + public void testConcurrentSnapshotDeleteAndDeleteIndex() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + String snapshotName = "snapshot"; + final String index = "test"; + + TestClusterNodes.TestClusterNode masterNode = + testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); + + final StepListener> createIndicesListener = new StepListener<>(); + + continueOrDie(createRepoAndIndex(repoName, index, 1), createIndexResponse -> { + // create a few more indices to make it more likely that the subsequent index delete operation happens before snapshot + // finalization + final int indices = randomIntBetween(5, 20); + final GroupedActionListener listener = new GroupedActionListener<>(createIndicesListener, indices); + for (int i = 0; i < indices; ++i) { + client().admin().indices().create(new CreateIndexRequest("index-" + i), listener); + } + }); + + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createIndicesListener, createIndexResponses -> + client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(false) + .execute(createSnapshotResponseStepListener)); + + continueOrDie(createSnapshotResponseStepListener, + createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), noopListener())); + + deterministicTaskQueue.runAllRunnableTasks(); + + SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); + assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); + final Repository repository = masterNode.repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + assertThat(snapshotIds, hasSize(1)); + + final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertEquals(0, snapshotInfo.failedShards()); + } + /** * Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently * deleting a snapshot. @@ -438,8 +498,8 @@ public void testSnapshotPrimaryRelocations() { final StepListener clusterStateResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), - createIndexResponse -> masterAdminClient.cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener)); + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener)); continueOrDie(clusterStateResponseStepListener, clusterStateResponse -> { final ShardRouting shardToRelocate = clusterStateResponse.getState().routingTable().allShards(index).get(0); @@ -490,8 +550,8 @@ public void run() { final SnapshotsInProgress finalSnapshotsInProgress = testClusterNodes.randomDataNodeSafe() .clusterService.state().custom(SnapshotsInProgress.TYPE); assertThat(finalSnapshotsInProgress.entries(), empty()); - final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + final Repository repository = testClusterNodes.randomMasterNodeSafe().repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0))); } @@ -509,19 +569,18 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> { final AtomicBoolean initiatedSnapshot = new AtomicBoolean(false); for (int i = 0; i < documents; ++i) { // Index a few documents with different field names so we trigger a dynamic mapping update for each of them - masterNode.client.bulk( - new BulkRequest().add(new IndexRequest(index).source(Collections.singletonMap("foo" + i, "bar"))) + client().bulk(new BulkRequest().add(new IndexRequest(index).source(Collections.singletonMap("foo" + i, "bar"))) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), assertNoFailureListener( bulkResponse -> { assertFalse("Failures in bulkresponse: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); if (initiatedSnapshot.compareAndSet(false, true)) { - masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) - .setWaitForCompletion(true).execute(createSnapshotResponseStepListener); + client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true) + .execute(createSnapshotResponseStepListener); } })); } @@ -531,7 +590,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { final StepListener restoreSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().restoreSnapshot( + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> client().admin().cluster().restoreSnapshot( new RestoreSnapshotRequest(repoName, snapshotName) .renamePattern(index).renameReplacement(restoredIndex).waitForCompletion(true), restoreSnapshotResponseStepListener)); @@ -539,8 +598,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { continueOrDie(restoreSnapshotResponseStepListener, restoreSnapshotResponse -> { assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); - masterNode.client.search( - new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)), + client().search(new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)), searchResponseStepListener); }); @@ -564,7 +622,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); @@ -574,18 +632,19 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { assertEquals(0, snapshotInfo.failedShards()); } - private StepListener createRepoAndIndex(TestClusterNodes.TestClusterNode masterNode, String repoName, - String index, int shards) { - final AdminClient adminClient = masterNode.client.admin(); + private RepositoryData getRepositoryData(Repository repository) { + return repository.getRepositoryData(); + } + private StepListener createRepoAndIndex(String repoName, String index, int shards) { final StepListener createRepositoryListener = new StepListener<>(); - adminClient.cluster().preparePutRepository(repoName).setType(FsRepository.TYPE) + client().admin().cluster().preparePutRepository(repoName).setType(FsRepository.TYPE) .setSettings(Settings.builder().put("location", randomAlphaOfLength(10))).execute(createRepositoryListener); final StepListener createIndexResponseStepListener = new StepListener<>(); - continueOrDie(createRepositoryListener, acknowledgedResponse -> adminClient.indices().create( + continueOrDie(createRepositoryListener, acknowledgedResponse -> client().admin().indices().create( new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(shards)), createIndexResponseStepListener)); @@ -594,11 +653,7 @@ private StepListener createRepoAndIndex(TestClusterNodes.Te private void clearDisruptionsAndAwaitSync() { testClusterNodes.clearNetworkDisruptions(); - runUntil(() -> { - final List versions = testClusterNodes.nodes.values().stream() - .map(n -> n.clusterService.state().version()).distinct().collect(Collectors.toList()); - return versions.size() == 1L; - }, TimeUnit.MINUTES.toMillis(1L)); + stabilize(); } private void disconnectOrRestartDataNode() { @@ -635,15 +690,25 @@ private void startCluster() { .filter(DiscoveryNode::isMasterNode).map(DiscoveryNode::getId).collect(Collectors.toSet())); testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()).forEach( testClusterNode -> testClusterNode.coordinator.setInitialConfiguration(votingConfiguration)); + // Connect all nodes to each other + testClusterNodes.nodes.values().forEach(node -> testClusterNodes.nodes.values().forEach( + n -> n.transportService.connectToNode(node.node, null, + ActionTestUtils.assertNoFailureListener(c -> logger.info("--> Connected [{}] to [{}]", n.node, node.node))))); + stabilize(); + } + private void stabilize() { runUntil( () -> { - List masterNodeIds = testClusterNodes.nodes.values().stream() - .map(node -> node.clusterService.state().nodes().getMasterNodeId()) - .distinct().collect(Collectors.toList()); - return masterNodeIds.size() == 1 && masterNodeIds.contains(null) == false; + final Collection clusterStates = + testClusterNodes.nodes.values().stream().map(node -> node.clusterService.state()).collect(Collectors.toList()); + final Set masterNodeIds = clusterStates.stream() + .map(clusterState -> clusterState.nodes().getMasterNodeId()).collect(Collectors.toSet()); + final Set terms = clusterStates.stream().map(ClusterState::term).collect(Collectors.toSet()); + final List versions = clusterStates.stream().map(ClusterState::version).distinct().collect(Collectors.toList()); + return versions.size() == 1 && masterNodeIds.size() == 1 && masterNodeIds.contains(null) == false && terms.size() == 1; }, - TimeUnit.SECONDS.toMillis(30L) + TimeUnit.MINUTES.toMillis(1L) ); } @@ -689,6 +754,16 @@ private static ActionListener noopListener() { return ActionListener.wrap(() -> {}); } + public NodeClient client() { + // Select from sorted list of nodes + final List nodes = testClusterNodes.nodes.values().stream() + .filter(n -> testClusterNodes.disconnectedNodes.contains(n.node.getName()) == false) + .sorted(Comparator.comparing(n -> n.node.getName())).collect(Collectors.toList()); + if (nodes.isEmpty()) { + throw new AssertionError("No nodes available"); + } + return randomFrom(nodes).client; + } /** * Create a {@link Environment} with random path.home and path.repo **/ @@ -765,6 +840,7 @@ public TestClusterNode randomMasterNodeSafe() { public Optional randomMasterNode() { // Select from sorted list of data-nodes here to not have deterministic behaviour final List masterNodes = testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()) + .filter(n -> disconnectedNodes.contains(n.node.getName()) == false) .sorted(Comparator.comparing(n -> n.node.getName())).collect(Collectors.toList()); return masterNodes.isEmpty() ? Optional.empty() : Optional.of(randomFrom(masterNodes)); } @@ -1108,6 +1184,8 @@ searchTransportService, new SearchPhaseController(searchService::createReduceCon transportService, clusterService, repositoriesService, threadPool, actionFilters, indexNameExpressionResolver )); + actions.put(CleanupRepositoryAction.INSTANCE, new TransportCleanupRepositoryAction(transportService, clusterService, + repositoriesService, threadPool, actionFilters, indexNameExpressionResolver)); actions.put(CreateSnapshotAction.INSTANCE, new TransportCreateSnapshotAction( transportService, clusterService, threadPool,