From 6af4bed92d4501100817eb11eda51da35ccfc872 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 8 Jan 2018 15:03:40 +0100 Subject: [PATCH] Avoid concurrent snapshot finalizations when deleting an INIT snapshot (#28078) This commit removes the finalization of a snapshot by the snapshot deletion request. This way, the deletion marks the snapshot as ABORTED in cluster state and waits for the snapshot completion. It is the responsability of the snapshot execution to detect the abortion and terminates itself correctly. This avoids concurrent snapshot finalizations and also ordinates the operations: the deletion aborts the snapshot and waits for the snapshot completion, the creation detects the abortion and stops by itself and finalizes the snapshot, then the deletion resumes and continues the deletion process. --- .../snapshots/SnapshotsService.java | 73 ++++++++++++------- .../SharedClusterSnapshotRestoreIT.java | 3 +- 2 files changed, 47 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index be5128cf43ba5..6be0cf7dd31ad 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -372,8 +372,8 @@ private void beginSnapshot(final ClusterState clusterState, return; } clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { - boolean accepted = false; - SnapshotsInProgress.Entry updatedSnapshot; + + SnapshotsInProgress.Entry endSnapshot; String failure = null; @Override @@ -381,17 +381,23 @@ public ClusterState execute(ClusterState currentState) { SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); List entries = new ArrayList<>(); for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - if (entry.snapshot().equals(snapshot.snapshot()) && entry.state() != State.ABORTED) { - // Replace the snapshot that was just created + if (entry.snapshot().equals(snapshot.snapshot()) == false) { + entries.add(entry); + continue; + } + + if (entry.state() != State.ABORTED) { + // Replace the snapshot that was just intialized ImmutableOpenMap shards = shards(currentState, entry.indices()); if (!partial) { Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData()); Set missing = indicesWithMissingShards.v1(); Set closed = indicesWithMissingShards.v2(); if (missing.isEmpty() == false || closed.isEmpty() == false) { - StringBuilder failureMessage = new StringBuilder(); - updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards); - entries.add(updatedSnapshot); + endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards); + entries.add(endSnapshot); + + final StringBuilder failureMessage = new StringBuilder(); if (missing.isEmpty() == false) { failureMessage.append("Indices don't have primary shards "); failureMessage.append(missing); @@ -407,13 +413,16 @@ public ClusterState execute(ClusterState currentState) { continue; } } - updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards); + SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards); entries.add(updatedSnapshot); - if (!completed(shards.values())) { - accepted = true; + if (completed(shards.values())) { + endSnapshot = updatedSnapshot; } } else { - entries.add(entry); + assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization"; + failure = "snapshot was aborted during initialization"; + endSnapshot = entry; + entries.add(endSnapshot); } } return ClusterState.builder(currentState) @@ -448,8 +457,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS // We should end snapshot only if 1) we didn't accept it for processing (which happens when there // is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should // go ahead and continue working on this snapshot rather then end here. - if (!accepted && updatedSnapshot != null) { - endSnapshot(updatedSnapshot, failure); + if (endSnapshot != null) { + endSnapshot(endSnapshot, failure); } } }); @@ -749,6 +758,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { } entries.add(updatedSnapshot); } else if (snapshot.state() == State.INIT && newMaster) { + changed = true; + // Mark the snapshot as aborted as it failed to start from the previous master + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards()); + entries.add(updatedSnapshot); + // Clean up the snapshot that failed to start from the old master deleteSnapshot(snapshot.snapshot(), new DeleteSnapshotListener() { @Override @@ -934,7 +948,7 @@ private Tuple, Set> indicesWithMissingShards(ImmutableOpenMa * * @param entry snapshot */ - void endSnapshot(SnapshotsInProgress.Entry entry) { + void endSnapshot(final SnapshotsInProgress.Entry entry) { endSnapshot(entry, null); } @@ -1142,24 +1156,26 @@ public ClusterState execute(ClusterState currentState) throws Exception { } else { // This snapshot is currently running - stopping shards first waitForSnapshot = true; - ImmutableOpenMap shards; - if (snapshotEntry.state() == State.STARTED && snapshotEntry.shards() != null) { - // snapshot is currently running - stop started shards - ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); + + final ImmutableOpenMap shards; + + final State state = snapshotEntry.state(); + if (state == State.INIT) { + // snapshot is still initializing, mark it as aborted + shards = snapshotEntry.shards(); + + } else if (state == State.STARTED) { + // snapshot is started - mark every non completed shard as aborted + final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); for (ObjectObjectCursor shardEntry : snapshotEntry.shards()) { ShardSnapshotStatus status = shardEntry.value; - if (!status.state().completed()) { - shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED, - "aborted by snapshot deletion")); - } else { - shardsBuilder.put(shardEntry.key, status); + if (status.state().completed() == false) { + status = new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted by snapshot deletion"); } + shardsBuilder.put(shardEntry.key, status); } shards = shardsBuilder.build(); - } else if (snapshotEntry.state() == State.INIT) { - // snapshot hasn't started yet - end it - shards = snapshotEntry.shards(); - endSnapshot(snapshotEntry); + } else { boolean hasUncompletedShards = false; // Cleanup in case a node gone missing and snapshot wasn't updated for some reason @@ -1176,7 +1192,8 @@ public ClusterState execute(ClusterState currentState) throws Exception { logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes"); return currentState; } else { - // no shards to wait for - finish the snapshot + // no shards to wait for but a node is gone - this is the only case + // where we force to finish the snapshot logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately"); shards = snapshotEntry.shards(); endSnapshot(snapshotEntry); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 1cfafc1a228fe..0b6f2846d8f57 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -3060,6 +3060,7 @@ public void testGetSnapshotsFromIndexBlobOnly() throws Exception { } } + @TestLogging("org.elasticsearch.snapshots:TRACE") public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception { final Client client = client(); @@ -3075,7 +3076,7 @@ public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception { for (int i = 0; i < nbDocs; i++) { index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); } - refresh(); + flushAndRefresh("test-idx"); assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs)); // Create a snapshot