From 9b6d37a77097550939d9cb5aabc0ec14487173b0 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 22 Dec 2017 15:21:21 +0100 Subject: [PATCH] Do not start snapshots that are deleted during initialization (#27931) When a new snapshot is created it is added to the cluster state as a snapshot-in-progress in INIT state, and the initialization is kicked off in a new runnable task by SnapshotService.beginSnapshot(). The initialization writes multiple files before updating the cluster state to change the snapshot-in-progress to STARTED state. This leaves a short window in which the snapshot could be deleted (let's say, because the snapshot is stuck in INIT or because it takes too much time to upload all the initialization files for all snapshotted indices). If the INIT snapshot is deleted, the snapshot-in-progress becomes ABORTED but once the initialization in SnapshotService.beginSnapshot() finished it is change back to STARTED state again. This commit avoids an ABORTED snapshot to be started if it has been deleted during initialization. It also adds a test that would have failed with the previous behavior, and changes few method names here and there. --- .../snapshots/SnapshotException.java | 2 +- .../snapshots/SnapshotMissingException.java | 6 +- .../snapshots/SnapshotShardsService.java | 161 +++++++++++------- .../snapshots/SnapshotsService.java | 12 +- .../SharedClusterSnapshotRestoreIT.java | 71 ++++++++ 5 files changed, 181 insertions(+), 71 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotException.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotException.java index 0acd73d62eab9..d389ed634f3af 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotException.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotException.java @@ -66,7 +66,7 @@ public SnapshotException(final String repositoryName, final String snapshotName, } public SnapshotException(final String repositoryName, final String snapshotName, final String msg, final Throwable cause) { - super("[" + repositoryName + ":" + snapshotName + "]" + msg, cause); + super("[" + repositoryName + ":" + snapshotName + "] " + msg, cause); this.repositoryName = repositoryName; this.snapshotName = snapshotName; } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotMissingException.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotMissingException.java index b7f2c6af4a1da..5f0979e38d8b9 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotMissingException.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotMissingException.java @@ -30,15 +30,15 @@ public class SnapshotMissingException extends SnapshotException { public SnapshotMissingException(final String repositoryName, final SnapshotId snapshotId, final Throwable cause) { - super(repositoryName, snapshotId, " is missing", cause); + super(repositoryName, snapshotId, "is missing", cause); } public SnapshotMissingException(final String repositoryName, final SnapshotId snapshotId) { - super(repositoryName, snapshotId, " is missing"); + super(repositoryName, snapshotId, "is missing"); } public SnapshotMissingException(final String repositoryName, final String snapshotName) { - super(repositoryName, snapshotName, " is missing"); + super(repositoryName, snapshotName, "is missing"); } public SnapshotMissingException(StreamInput in) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 8d9d1fad5157f..e0f087d8226ec 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -146,7 +147,6 @@ protected void doStop() { } finally { shutdownLock.unlock(); } - } @Override @@ -157,14 +157,16 @@ protected void doClose() { @Override public void applyClusterState(ClusterChangedEvent event) { try { - SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE); - SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE); - - if ((prev == null && curr != null) || (prev != null && prev.equals(curr) == false)) { + SnapshotsInProgress previousSnapshots = event.previousState().custom(SnapshotsInProgress.TYPE); + SnapshotsInProgress currentSnapshots = event.state().custom(SnapshotsInProgress.TYPE); + if ((previousSnapshots == null && currentSnapshots != null) + || (previousSnapshots != null && previousSnapshots.equals(currentSnapshots) == false)) { processIndexShardSnapshots(event); } - String masterNodeId = event.state().nodes().getMasterNodeId(); - if (masterNodeId != null && masterNodeId.equals(event.previousState().nodes().getMasterNodeId()) == false) { + + String previousMasterNodeId = event.previousState().nodes().getMasterNodeId(); + String currentMasterNodeId = event.state().nodes().getMasterNodeId(); + if (currentMasterNodeId != null && currentMasterNodeId.equals(previousMasterNodeId) == false) { syncShardStatsOnNewMaster(event); } @@ -281,17 +283,18 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { snapshotStatus.abort(); break; case FINALIZE: - logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, letting it finish", entry.snapshot(), shard.key); + logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + + "letting it finish", entry.snapshot(), shard.key); break; case DONE: - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshot(), shard.key); - updateIndexShardSnapshotStatus(entry.snapshot(), shard.key, - new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + + "updating status on the master", entry.snapshot(), shard.key); + notifySuccessfulSnapshotShard(entry.snapshot(), shard.key, localNodeId, masterNode); break; case FAILURE: - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshot(), shard.key); - updateIndexShardSnapshotStatus(entry.snapshot(), shard.key, - new ShardSnapshotStatus(localNodeId, State.FAILED, snapshotStatus.failure()), masterNode); + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + + "updating status on the master", entry.snapshot(), shard.key); + notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, snapshotStatus.failure(), masterNode); break; default: throw new IllegalStateException("Unknown snapshot shard stage " + snapshotStatus.stage()); @@ -320,34 +323,47 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { if (newSnapshots.isEmpty() == false) { Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); for (final Map.Entry> entry : newSnapshots.entrySet()) { - Map indicesMap = snapshotIndices.get(entry.getKey()); + final Snapshot snapshot = entry.getKey(); + final Map indicesMap = snapshotIndices.get(snapshot); assert indicesMap != null; + for (final Map.Entry shardEntry : entry.getValue().entrySet()) { final ShardId shardId = shardEntry.getKey(); - try { - final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); - final IndexId indexId = indicesMap.get(shardId.getIndexName()); - assert indexId != null; - executor.execute(new AbstractRunnable() { - @Override - public void doRun() { - snapshot(indexShard, entry.getKey(), indexId, shardEntry.getValue()); - updateIndexShardSnapshotStatus(entry.getKey(), shardId, - new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); - } + final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); + final IndexId indexId = indicesMap.get(shardId.getIndexName()); + assert indexId != null; + executor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to create snapshot", shardId, entry.getKey()), e); - updateIndexShardSnapshotStatus(entry.getKey(), shardId, - new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e)), masterNode); - } + final SetOnce failure = new SetOnce<>(); - }); - } catch (Exception e) { - updateIndexShardSnapshotStatus(entry.getKey(), shardId, - new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e)), masterNode); - } + @Override + public void doRun() { + snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); + } + + @Override + public void onFailure(Exception e) { + logger.warn((Supplier) () -> + new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + failure.set(e); + } + + @Override + public void onRejection(Exception e) { + failure.set(e); + } + + @Override + public void onAfter() { + final Exception exception = failure.get(); + if (exception != null) { + final String failure = ExceptionsHelper.detailedMessage(exception); + notifyFailedSnapshotShard(snapshot, shardId, localNodeId, failure, masterNode); + } else { + notifySuccessfulSnapshotShard(snapshot, shardId, localNodeId, masterNode); + } + } + }); } } } @@ -360,34 +376,36 @@ public void onFailure(Exception e) { * @param snapshotStatus snapshot status */ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId, final IndexShardSnapshotStatus snapshotStatus) { - Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository()); - ShardId shardId = indexShard.shardId(); - if (!indexShard.routingEntry().primary()) { + final ShardId shardId = indexShard.shardId(); + if (indexShard.routingEntry().primary() == false) { throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); } if (indexShard.routingEntry().relocating()) { // do not snapshot when in the process of relocation of primaries so we won't get conflicts throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating"); } - if (indexShard.state() == IndexShardState.CREATED || indexShard.state() == IndexShardState.RECOVERING) { + + final IndexShardState indexShardState = indexShard.state(); + if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) { // shard has just been created, or still recovering throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet"); } + final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository()); try { // we flush first to make sure we get the latest writes snapshotted try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) { repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus); if (logger.isDebugEnabled()) { - StringBuilder sb = new StringBuilder(); - sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n"); + StringBuilder details = new StringBuilder(); + details.append(" index : version [").append(snapshotStatus.indexVersion()); + details.append("], number_of_files [").append(snapshotStatus.numberOfFiles()); + details.append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n"); logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, repository, - TimeValue.timeValueMillis(snapshotStatus.time()), sb); + TimeValue.timeValueMillis(snapshotStatus.time()), details); } } - } catch (SnapshotFailedEngineException e) { - throw e; - } catch (IndexShardSnapshotFailedException e) { + } catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e) { throw e; } catch (Exception e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to snapshot", e); @@ -402,6 +420,7 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { if (snapshotsInProgress == null) { return; } + final String localNodeId = event.state().nodes().getLocalNodeId(); final DiscoveryNode masterNode = event.state().nodes().getMasterNode(); for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { @@ -417,15 +436,16 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { // Master knows about the shard and thinks it has not completed if (localShardStatus.stage() == Stage.DONE) { // but we think the shard is done - we need to make new master know that the shard is done - logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master", snapshot.snapshot(), shardId); - updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId, - new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); + logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " + + "updating status on the master", snapshot.snapshot(), shardId); + notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localNodeId, masterNode); + } else if (localShard.getValue().stage() == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed - logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master", snapshot.snapshot(), shardId); - updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId, - new ShardSnapshotStatus(localNodeId, State.FAILED, localShardStatus.failure()), masterNode); - + logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, " + + "updating status on master", snapshot.snapshot(), shardId); + final String failure = localShardStatus.failure(); + notifyFailedSnapshotShard(snapshot.snapshot(), shardId, localNodeId, failure, masterNode); } } } @@ -445,7 +465,6 @@ private SnapshotShards(Map shards) { } } - /** * Internal request that is used to send changes in snapshot status to master */ @@ -498,15 +517,33 @@ public String toString() { } } - /** - * Updates the shard status - */ - public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) { - UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); + /** Notify the master node that the given shard has been successfully snapshotted **/ + void notifySuccessfulSnapshotShard(final Snapshot snapshot, + final ShardId shardId, + final String localNodeId, + final DiscoveryNode masterNode) { + sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); + } + + /** Notify the master node that the given shard failed to be snapshotted **/ + void notifyFailedSnapshotShard(final Snapshot snapshot, + final ShardId shardId, + final String localNodeId, + final String failure, + final DiscoveryNode masterNode) { + sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(localNodeId, State.FAILED, failure), masterNode); + } + + /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */ + void sendSnapshotShardUpdate(final Snapshot snapshot, + final ShardId shardId, + final ShardSnapshotStatus status, + final DiscoveryNode masterNode) { try { - transportService.sendRequest(master, UPDATE_SNAPSHOT_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); + final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); + transportService.sendRequest(masterNode, UPDATE_SNAPSHOT_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", request.snapshot(), request.status()), e); + logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e); } } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 2085368744af9..be5128cf43ba5 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -96,7 +96,7 @@ * kicks in and initializes the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(ClusterChangedEvent)} method
  • - *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link SnapshotShardsService#updateIndexShardSnapshotStatus} method
  • + *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus, DiscoveryNode)} method
  • *
  • When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot as completed
  • *
  • After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository, * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state
  • @@ -381,7 +381,7 @@ public ClusterState execute(ClusterState currentState) { SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); List entries = new ArrayList<>(); for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - if (entry.snapshot().equals(snapshot.snapshot())) { + if (entry.snapshot().equals(snapshot.snapshot()) && entry.state() != State.ABORTED) { // Replace the snapshot that was just created ImmutableOpenMap shards = shards(currentState, entry.indices()); if (!partial) { @@ -392,11 +392,11 @@ public ClusterState execute(ClusterState currentState) { StringBuilder failureMessage = new StringBuilder(); updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards); entries.add(updatedSnapshot); - if (missing.isEmpty() == false ) { + if (missing.isEmpty() == false) { failureMessage.append("Indices don't have primary shards "); failureMessage.append(missing); } - if (closed.isEmpty() == false ) { + if (closed.isEmpty() == false) { if (failureMessage.length() > 0) { failureMessage.append("; "); } @@ -1235,7 +1235,7 @@ public void onSnapshotFailure(Snapshot failedSnapshot, Exception e) { "could not be found after failing to abort.", smex.getSnapshotName()), e); listener.onFailure(new SnapshotException(snapshot, - "Tried deleting in-progress snapshot [{}], but it " + + "Tried deleting in-progress snapshot [" + smex.getSnapshotName() + "], but it " + "could not be found after failing to abort.", smex)); } }); @@ -1290,6 +1290,8 @@ private void deleteSnapshotFromRepository(final Snapshot snapshot, @Nullable fin try { Repository repository = repositoriesService.repository(snapshot.getRepository()); repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId); + logger.info("snapshot [{}] deleted", snapshot); + removeSnapshotDeletionFromClusterState(snapshot, null, listener); } catch (Exception ex) { removeSnapshotDeletionFromClusterState(snapshot, ex, listener); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 650acb369b99f..1cfafc1a228fe 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage; @@ -45,6 +46,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; @@ -99,6 +101,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -3057,6 +3060,74 @@ public void testGetSnapshotsFromIndexBlobOnly() throws Exception { } } + public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception { + final Client client = client(); + + // Blocks on initialization + assertAcked(client.admin().cluster().preparePutRepository("repository") + .setType("mock").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("block_on_init", true) + )); + + createIndex("test-idx"); + final int nbDocs = scaledRandomIntBetween(100, 500); + for (int i = 0; i < nbDocs; i++) { + index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs)); + + // Create a snapshot + client.admin().cluster().prepareCreateSnapshot("repository", "snap").execute(); + waitForBlock(internalCluster().getMasterName(), "repository", TimeValue.timeValueMinutes(1)); + boolean blocked = true; + + // Snapshot is initializing (and is blocked at this stage) + SnapshotsStatusResponse snapshotsStatus = client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get(); + assertThat(snapshotsStatus.getSnapshots().iterator().next().getState(), equalTo(State.INIT)); + + final List states = new CopyOnWriteArrayList<>(); + final ClusterStateListener listener = event -> { + SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); + for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { + if ("snap".equals(entry.snapshot().getSnapshotId().getName())) { + states.add(entry.state()); + } + } + }; + + try { + // Record the upcoming states of the snapshot on all nodes + internalCluster().getInstances(ClusterService.class).forEach(clusterService -> clusterService.addListener(listener)); + + // Delete the snapshot while it is being initialized + ActionFuture delete = client.admin().cluster().prepareDeleteSnapshot("repository", "snap").execute(); + + // The deletion must set the snapshot in the ABORTED state + assertBusy(() -> { + SnapshotsStatusResponse status = client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get(); + assertThat(status.getSnapshots().iterator().next().getState(), equalTo(State.ABORTED)); + }); + + // Now unblock the repository + unblockNode("repository", internalCluster().getMasterName()); + blocked = false; + + assertAcked(delete.get()); + expectThrows(SnapshotMissingException.class, () -> + client.admin().cluster().prepareGetSnapshots("repository").setSnapshots("snap").get()); + + assertFalse("Expecting snapshot state to be updated", states.isEmpty()); + assertFalse("Expecting snapshot to be aborted and not started at all", states.contains(State.STARTED)); + } finally { + internalCluster().getInstances(ClusterService.class).forEach(clusterService -> clusterService.removeListener(listener)); + if (blocked) { + unblockNode("repository", internalCluster().getMasterName()); + } + } + } + private void verifySnapshotInfo(final GetSnapshotsResponse response, final Map> indicesPerSnapshot) { for (SnapshotInfo snapshotInfo : response.getSnapshots()) { final List expected = snapshotInfo.indices();