diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index be5128cf43ba5..6be0cf7dd31ad 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -372,8 +372,8 @@ private void beginSnapshot(final ClusterState clusterState, return; } clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { - boolean accepted = false; - SnapshotsInProgress.Entry updatedSnapshot; + + SnapshotsInProgress.Entry endSnapshot; String failure = null; @Override @@ -381,17 +381,23 @@ public ClusterState execute(ClusterState currentState) { SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); List entries = new ArrayList<>(); for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - if (entry.snapshot().equals(snapshot.snapshot()) && entry.state() != State.ABORTED) { - // Replace the snapshot that was just created + if (entry.snapshot().equals(snapshot.snapshot()) == false) { + entries.add(entry); + continue; + } + + if (entry.state() != State.ABORTED) { + // Replace the snapshot that was just intialized ImmutableOpenMap shards = shards(currentState, entry.indices()); if (!partial) { Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData()); Set missing = indicesWithMissingShards.v1(); Set closed = indicesWithMissingShards.v2(); if (missing.isEmpty() == false || closed.isEmpty() == false) { - StringBuilder failureMessage = new StringBuilder(); - updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards); - entries.add(updatedSnapshot); + endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards); + entries.add(endSnapshot); + + final StringBuilder failureMessage = new StringBuilder(); if (missing.isEmpty() == false) { failureMessage.append("Indices don't have primary shards "); failureMessage.append(missing); @@ -407,13 +413,16 @@ public ClusterState execute(ClusterState currentState) { continue; } } - updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards); + SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards); entries.add(updatedSnapshot); - if (!completed(shards.values())) { - accepted = true; + if (completed(shards.values())) { + endSnapshot = updatedSnapshot; } } else { - entries.add(entry); + assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization"; + failure = "snapshot was aborted during initialization"; + endSnapshot = entry; + entries.add(endSnapshot); } } return ClusterState.builder(currentState) @@ -448,8 +457,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS // We should end snapshot only if 1) we didn't accept it for processing (which happens when there // is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should // go ahead and continue working on this snapshot rather then end here. - if (!accepted && updatedSnapshot != null) { - endSnapshot(updatedSnapshot, failure); + if (endSnapshot != null) { + endSnapshot(endSnapshot, failure); } } }); @@ -749,6 +758,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { } entries.add(updatedSnapshot); } else if (snapshot.state() == State.INIT && newMaster) { + changed = true; + // Mark the snapshot as aborted as it failed to start from the previous master + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards()); + entries.add(updatedSnapshot); + // Clean up the snapshot that failed to start from the old master deleteSnapshot(snapshot.snapshot(), new DeleteSnapshotListener() { @Override @@ -934,7 +948,7 @@ private Tuple, Set> indicesWithMissingShards(ImmutableOpenMa * * @param entry snapshot */ - void endSnapshot(SnapshotsInProgress.Entry entry) { + void endSnapshot(final SnapshotsInProgress.Entry entry) { endSnapshot(entry, null); } @@ -1142,24 +1156,26 @@ public ClusterState execute(ClusterState currentState) throws Exception { } else { // This snapshot is currently running - stopping shards first waitForSnapshot = true; - ImmutableOpenMap shards; - if (snapshotEntry.state() == State.STARTED && snapshotEntry.shards() != null) { - // snapshot is currently running - stop started shards - ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); + + final ImmutableOpenMap shards; + + final State state = snapshotEntry.state(); + if (state == State.INIT) { + // snapshot is still initializing, mark it as aborted + shards = snapshotEntry.shards(); + + } else if (state == State.STARTED) { + // snapshot is started - mark every non completed shard as aborted + final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); for (ObjectObjectCursor shardEntry : snapshotEntry.shards()) { ShardSnapshotStatus status = shardEntry.value; - if (!status.state().completed()) { - shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED, - "aborted by snapshot deletion")); - } else { - shardsBuilder.put(shardEntry.key, status); + if (status.state().completed() == false) { + status = new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted by snapshot deletion"); } + shardsBuilder.put(shardEntry.key, status); } shards = shardsBuilder.build(); - } else if (snapshotEntry.state() == State.INIT) { - // snapshot hasn't started yet - end it - shards = snapshotEntry.shards(); - endSnapshot(snapshotEntry); + } else { boolean hasUncompletedShards = false; // Cleanup in case a node gone missing and snapshot wasn't updated for some reason @@ -1176,7 +1192,8 @@ public ClusterState execute(ClusterState currentState) throws Exception { logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes"); return currentState; } else { - // no shards to wait for - finish the snapshot + // no shards to wait for but a node is gone - this is the only case + // where we force to finish the snapshot logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately"); shards = snapshotEntry.shards(); endSnapshot(snapshotEntry); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 1cfafc1a228fe..0b6f2846d8f57 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -3060,6 +3060,7 @@ public void testGetSnapshotsFromIndexBlobOnly() throws Exception { } } + @TestLogging("org.elasticsearch.snapshots:TRACE") public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception { final Client client = client(); @@ -3075,7 +3076,7 @@ public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception { for (int i = 0; i < nbDocs; i++) { index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); } - refresh(); + flushAndRefresh("test-idx"); assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs)); // Create a snapshot