Skip to content

Commit

Permalink
Avoid concurrent snapshot finalizations when deleting an INIT snapshot (
Browse files Browse the repository at this point in the history
#28078)

This commit removes the finalization of a snapshot by the snapshot deletion request. This way, the deletion marks the snapshot as ABORTED in cluster state and waits for the snapshot completion. It is the responsability of the snapshot execution to detect the abortion and terminates itself correctly. This
avoids concurrent snapshot finalizations and also ordinates the operations: the deletion
aborts the snapshot and waits for the snapshot completion, the creation detects the abortion
and stops by itself and finalizes the snapshot, then the deletion resumes and continues
the deletion process.
  • Loading branch information
tlrx committed Jan 15, 2018
1 parent 9b6d37a commit 6af4bed
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -372,26 +372,32 @@ private void beginSnapshot(final ClusterState clusterState,
return;
}
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
boolean accepted = false;
SnapshotsInProgress.Entry updatedSnapshot;

SnapshotsInProgress.Entry endSnapshot;
String failure = null;

@Override
public ClusterState execute(ClusterState currentState) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.snapshot().equals(snapshot.snapshot()) && entry.state() != State.ABORTED) {
// Replace the snapshot that was just created
if (entry.snapshot().equals(snapshot.snapshot()) == false) {
entries.add(entry);
continue;
}

if (entry.state() != State.ABORTED) {
// Replace the snapshot that was just intialized
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
if (!partial) {
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData());
Set<String> missing = indicesWithMissingShards.v1();
Set<String> closed = indicesWithMissingShards.v2();
if (missing.isEmpty() == false || closed.isEmpty() == false) {
StringBuilder failureMessage = new StringBuilder();
updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
entries.add(updatedSnapshot);
endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
entries.add(endSnapshot);

final StringBuilder failureMessage = new StringBuilder();
if (missing.isEmpty() == false) {
failureMessage.append("Indices don't have primary shards ");
failureMessage.append(missing);
Expand All @@ -407,13 +413,16 @@ public ClusterState execute(ClusterState currentState) {
continue;
}
}
updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
entries.add(updatedSnapshot);
if (!completed(shards.values())) {
accepted = true;
if (completed(shards.values())) {
endSnapshot = updatedSnapshot;
}
} else {
entries.add(entry);
assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization";
failure = "snapshot was aborted during initialization";
endSnapshot = entry;
entries.add(endSnapshot);
}
}
return ClusterState.builder(currentState)
Expand Down Expand Up @@ -448,8 +457,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
// We should end snapshot only if 1) we didn't accept it for processing (which happens when there
// is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should
// go ahead and continue working on this snapshot rather then end here.
if (!accepted && updatedSnapshot != null) {
endSnapshot(updatedSnapshot, failure);
if (endSnapshot != null) {
endSnapshot(endSnapshot, failure);
}
}
});
Expand Down Expand Up @@ -749,6 +758,11 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}
entries.add(updatedSnapshot);
} else if (snapshot.state() == State.INIT && newMaster) {
changed = true;
// Mark the snapshot as aborted as it failed to start from the previous master
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards());
entries.add(updatedSnapshot);

// Clean up the snapshot that failed to start from the old master
deleteSnapshot(snapshot.snapshot(), new DeleteSnapshotListener() {
@Override
Expand Down Expand Up @@ -934,7 +948,7 @@ private Tuple<Set<String>, Set<String>> indicesWithMissingShards(ImmutableOpenMa
*
* @param entry snapshot
*/
void endSnapshot(SnapshotsInProgress.Entry entry) {
void endSnapshot(final SnapshotsInProgress.Entry entry) {
endSnapshot(entry, null);
}

Expand Down Expand Up @@ -1142,24 +1156,26 @@ public ClusterState execute(ClusterState currentState) throws Exception {
} else {
// This snapshot is currently running - stopping shards first
waitForSnapshot = true;
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
if (snapshotEntry.state() == State.STARTED && snapshotEntry.shards() != null) {
// snapshot is currently running - stop started shards
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();

final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;

final State state = snapshotEntry.state();
if (state == State.INIT) {
// snapshot is still initializing, mark it as aborted
shards = snapshotEntry.shards();

} else if (state == State.STARTED) {
// snapshot is started - mark every non completed shard as aborted
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
ShardSnapshotStatus status = shardEntry.value;
if (!status.state().completed()) {
shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED,
"aborted by snapshot deletion"));
} else {
shardsBuilder.put(shardEntry.key, status);
if (status.state().completed() == false) {
status = new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted by snapshot deletion");
}
shardsBuilder.put(shardEntry.key, status);
}
shards = shardsBuilder.build();
} else if (snapshotEntry.state() == State.INIT) {
// snapshot hasn't started yet - end it
shards = snapshotEntry.shards();
endSnapshot(snapshotEntry);

} else {
boolean hasUncompletedShards = false;
// Cleanup in case a node gone missing and snapshot wasn't updated for some reason
Expand All @@ -1176,7 +1192,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes");
return currentState;
} else {
// no shards to wait for - finish the snapshot
// no shards to wait for but a node is gone - this is the only case
// where we force to finish the snapshot
logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately");
shards = snapshotEntry.shards();
endSnapshot(snapshotEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3060,6 +3060,7 @@ public void testGetSnapshotsFromIndexBlobOnly() throws Exception {
}
}

@TestLogging("org.elasticsearch.snapshots:TRACE")
public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
final Client client = client();

Expand All @@ -3075,7 +3076,7 @@ public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
for (int i = 0; i < nbDocs; i++) {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
flushAndRefresh("test-idx");
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs));

// Create a snapshot
Expand Down

0 comments on commit 6af4bed

Please sign in to comment.