diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotException.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotException.java index 0acd73d62eab9..d389ed634f3af 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotException.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotException.java @@ -66,7 +66,7 @@ public SnapshotException(final String repositoryName, final String snapshotName, } public SnapshotException(final String repositoryName, final String snapshotName, final String msg, final Throwable cause) { - super("[" + repositoryName + ":" + snapshotName + "]" + msg, cause); + super("[" + repositoryName + ":" + snapshotName + "] " + msg, cause); this.repositoryName = repositoryName; this.snapshotName = snapshotName; } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotMissingException.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotMissingException.java index b7f2c6af4a1da..5f0979e38d8b9 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotMissingException.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotMissingException.java @@ -30,15 +30,15 @@ public class SnapshotMissingException extends SnapshotException { public SnapshotMissingException(final String repositoryName, final SnapshotId snapshotId, final Throwable cause) { - super(repositoryName, snapshotId, " is missing", cause); + super(repositoryName, snapshotId, "is missing", cause); } public SnapshotMissingException(final String repositoryName, final SnapshotId snapshotId) { - super(repositoryName, snapshotId, " is missing"); + super(repositoryName, snapshotId, "is missing"); } public SnapshotMissingException(final String repositoryName, final String snapshotName) { - super(repositoryName, snapshotName, " is missing"); + super(repositoryName, snapshotName, "is missing"); } public SnapshotMissingException(StreamInput in) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 8d9d1fad5157f..e0f087d8226ec 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -146,7 +147,6 @@ protected void doStop() { } finally { shutdownLock.unlock(); } - } @Override @@ -157,14 +157,16 @@ protected void doClose() { @Override public void applyClusterState(ClusterChangedEvent event) { try { - SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE); - SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE); - - if ((prev == null && curr != null) || (prev != null && prev.equals(curr) == false)) { + SnapshotsInProgress previousSnapshots = event.previousState().custom(SnapshotsInProgress.TYPE); + SnapshotsInProgress currentSnapshots = event.state().custom(SnapshotsInProgress.TYPE); + if ((previousSnapshots == null && currentSnapshots != null) + || (previousSnapshots != null && previousSnapshots.equals(currentSnapshots) == false)) { processIndexShardSnapshots(event); } - String masterNodeId = event.state().nodes().getMasterNodeId(); - if (masterNodeId != null && masterNodeId.equals(event.previousState().nodes().getMasterNodeId()) == false) { + + String previousMasterNodeId = event.previousState().nodes().getMasterNodeId(); + String currentMasterNodeId = event.state().nodes().getMasterNodeId(); + if (currentMasterNodeId != null && currentMasterNodeId.equals(previousMasterNodeId) == false) { syncShardStatsOnNewMaster(event); } @@ -281,17 +283,18 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { snapshotStatus.abort(); break; case FINALIZE: - logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, letting it finish", entry.snapshot(), shard.key); + logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + + "letting it finish", entry.snapshot(), shard.key); break; case DONE: - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshot(), shard.key); - updateIndexShardSnapshotStatus(entry.snapshot(), shard.key, - new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + + "updating status on the master", entry.snapshot(), shard.key); + notifySuccessfulSnapshotShard(entry.snapshot(), shard.key, localNodeId, masterNode); break; case FAILURE: - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshot(), shard.key); - updateIndexShardSnapshotStatus(entry.snapshot(), shard.key, - new ShardSnapshotStatus(localNodeId, State.FAILED, snapshotStatus.failure()), masterNode); + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + + "updating status on the master", entry.snapshot(), shard.key); + notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, snapshotStatus.failure(), masterNode); break; default: throw new IllegalStateException("Unknown snapshot shard stage " + snapshotStatus.stage()); @@ -320,34 +323,47 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { if (newSnapshots.isEmpty() == false) { Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); for (final Map.Entry> entry : newSnapshots.entrySet()) { - Map indicesMap = snapshotIndices.get(entry.getKey()); + final Snapshot snapshot = entry.getKey(); + final Map indicesMap = snapshotIndices.get(snapshot); assert indicesMap != null; + for (final Map.Entry shardEntry : entry.getValue().entrySet()) { final ShardId shardId = shardEntry.getKey(); - try { - final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); - final IndexId indexId = indicesMap.get(shardId.getIndexName()); - assert indexId != null; - executor.execute(new AbstractRunnable() { - @Override - public void doRun() { - snapshot(indexShard, entry.getKey(), indexId, shardEntry.getValue()); - updateIndexShardSnapshotStatus(entry.getKey(), shardId, - new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); - } + final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); + final IndexId indexId = indicesMap.get(shardId.getIndexName()); + assert indexId != null; + executor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to create snapshot", shardId, entry.getKey()), e); - updateIndexShardSnapshotStatus(entry.getKey(), shardId, - new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e)), masterNode); - } + final SetOnce failure = new SetOnce<>(); - }); - } catch (Exception e) { - updateIndexShardSnapshotStatus(entry.getKey(), shardId, - new ShardSnapshotStatus(localNodeId, State.FAILED, ExceptionsHelper.detailedMessage(e)), masterNode); - } + @Override + public void doRun() { + snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); + } + + @Override + public void onFailure(Exception e) { + logger.warn((Supplier) () -> + new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + failure.set(e); + } + + @Override + public void onRejection(Exception e) { + failure.set(e); + } + + @Override + public void onAfter() { + final Exception exception = failure.get(); + if (exception != null) { + final String failure = ExceptionsHelper.detailedMessage(exception); + notifyFailedSnapshotShard(snapshot, shardId, localNodeId, failure, masterNode); + } else { + notifySuccessfulSnapshotShard(snapshot, shardId, localNodeId, masterNode); + } + } + }); } } } @@ -360,34 +376,36 @@ public void onFailure(Exception e) { * @param snapshotStatus snapshot status */ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId, final IndexShardSnapshotStatus snapshotStatus) { - Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository()); - ShardId shardId = indexShard.shardId(); - if (!indexShard.routingEntry().primary()) { + final ShardId shardId = indexShard.shardId(); + if (indexShard.routingEntry().primary() == false) { throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); } if (indexShard.routingEntry().relocating()) { // do not snapshot when in the process of relocation of primaries so we won't get conflicts throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating"); } - if (indexShard.state() == IndexShardState.CREATED || indexShard.state() == IndexShardState.RECOVERING) { + + final IndexShardState indexShardState = indexShard.state(); + if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) { // shard has just been created, or still recovering throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet"); } + final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository()); try { // we flush first to make sure we get the latest writes snapshotted try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) { repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus); if (logger.isDebugEnabled()) { - StringBuilder sb = new StringBuilder(); - sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n"); + StringBuilder details = new StringBuilder(); + details.append(" index : version [").append(snapshotStatus.indexVersion()); + details.append("], number_of_files [").append(snapshotStatus.numberOfFiles()); + details.append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n"); logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, repository, - TimeValue.timeValueMillis(snapshotStatus.time()), sb); + TimeValue.timeValueMillis(snapshotStatus.time()), details); } } - } catch (SnapshotFailedEngineException e) { - throw e; - } catch (IndexShardSnapshotFailedException e) { + } catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e) { throw e; } catch (Exception e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to snapshot", e); @@ -402,6 +420,7 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { if (snapshotsInProgress == null) { return; } + final String localNodeId = event.state().nodes().getLocalNodeId(); final DiscoveryNode masterNode = event.state().nodes().getMasterNode(); for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { @@ -417,15 +436,16 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { // Master knows about the shard and thinks it has not completed if (localShardStatus.stage() == Stage.DONE) { // but we think the shard is done - we need to make new master know that the shard is done - logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master", snapshot.snapshot(), shardId); - updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId, - new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); + logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " + + "updating status on the master", snapshot.snapshot(), shardId); + notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localNodeId, masterNode); + } else if (localShard.getValue().stage() == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed - logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master", snapshot.snapshot(), shardId); - updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId, - new ShardSnapshotStatus(localNodeId, State.FAILED, localShardStatus.failure()), masterNode); - + logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, " + + "updating status on master", snapshot.snapshot(), shardId); + final String failure = localShardStatus.failure(); + notifyFailedSnapshotShard(snapshot.snapshot(), shardId, localNodeId, failure, masterNode); } } } @@ -445,7 +465,6 @@ private SnapshotShards(Map shards) { } } - /** * Internal request that is used to send changes in snapshot status to master */ @@ -498,15 +517,33 @@ public String toString() { } } - /** - * Updates the shard status - */ - public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) { - UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); + /** Notify the master node that the given shard has been successfully snapshotted **/ + void notifySuccessfulSnapshotShard(final Snapshot snapshot, + final ShardId shardId, + final String localNodeId, + final DiscoveryNode masterNode) { + sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); + } + + /** Notify the master node that the given shard failed to be snapshotted **/ + void notifyFailedSnapshotShard(final Snapshot snapshot, + final ShardId shardId, + final String localNodeId, + final String failure, + final DiscoveryNode masterNode) { + sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(localNodeId, State.FAILED, failure), masterNode); + } + + /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */ + void sendSnapshotShardUpdate(final Snapshot snapshot, + final ShardId shardId, + final ShardSnapshotStatus status, + final DiscoveryNode masterNode) { try { - transportService.sendRequest(master, UPDATE_SNAPSHOT_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); + final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); + transportService.sendRequest(masterNode, UPDATE_SNAPSHOT_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", request.snapshot(), request.status()), e); + logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e); } } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 2085368744af9..be5128cf43ba5 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -96,7 +96,7 @@ * kicks in and initializes the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(ClusterChangedEvent)} method
  • - *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link SnapshotShardsService#updateIndexShardSnapshotStatus} method
  • + *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus, DiscoveryNode)} method
  • *
  • When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot as completed
  • *
  • After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository, * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state
  • @@ -381,7 +381,7 @@ public ClusterState execute(ClusterState currentState) { SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); List entries = new ArrayList<>(); for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - if (entry.snapshot().equals(snapshot.snapshot())) { + if (entry.snapshot().equals(snapshot.snapshot()) && entry.state() != State.ABORTED) { // Replace the snapshot that was just created ImmutableOpenMap shards = shards(currentState, entry.indices()); if (!partial) { @@ -392,11 +392,11 @@ public ClusterState execute(ClusterState currentState) { StringBuilder failureMessage = new StringBuilder(); updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards); entries.add(updatedSnapshot); - if (missing.isEmpty() == false ) { + if (missing.isEmpty() == false) { failureMessage.append("Indices don't have primary shards "); failureMessage.append(missing); } - if (closed.isEmpty() == false ) { + if (closed.isEmpty() == false) { if (failureMessage.length() > 0) { failureMessage.append("; "); } @@ -1235,7 +1235,7 @@ public void onSnapshotFailure(Snapshot failedSnapshot, Exception e) { "could not be found after failing to abort.", smex.getSnapshotName()), e); listener.onFailure(new SnapshotException(snapshot, - "Tried deleting in-progress snapshot [{}], but it " + + "Tried deleting in-progress snapshot [" + smex.getSnapshotName() + "], but it " + "could not be found after failing to abort.", smex)); } }); @@ -1290,6 +1290,8 @@ private void deleteSnapshotFromRepository(final Snapshot snapshot, @Nullable fin try { Repository repository = repositoriesService.repository(snapshot.getRepository()); repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId); + logger.info("snapshot [{}] deleted", snapshot); + removeSnapshotDeletionFromClusterState(snapshot, null, listener); } catch (Exception ex) { removeSnapshotDeletionFromClusterState(snapshot, ex, listener); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 650acb369b99f..1cfafc1a228fe 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage; @@ -45,6 +46,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; @@ -99,6 +101,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -3057,6 +3060,74 @@ public void testGetSnapshotsFromIndexBlobOnly() throws Exception { } } + public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception { + final Client client = client(); + + // Blocks on initialization + assertAcked(client.admin().cluster().preparePutRepository("repository") + .setType("mock").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("block_on_init", true) + )); + + createIndex("test-idx"); + final int nbDocs = scaledRandomIntBetween(100, 500); + for (int i = 0; i < nbDocs; i++) { + index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs)); + + // Create a snapshot + client.admin().cluster().prepareCreateSnapshot("repository", "snap").execute(); + waitForBlock(internalCluster().getMasterName(), "repository", TimeValue.timeValueMinutes(1)); + boolean blocked = true; + + // Snapshot is initializing (and is blocked at this stage) + SnapshotsStatusResponse snapshotsStatus = client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get(); + assertThat(snapshotsStatus.getSnapshots().iterator().next().getState(), equalTo(State.INIT)); + + final List states = new CopyOnWriteArrayList<>(); + final ClusterStateListener listener = event -> { + SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); + for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { + if ("snap".equals(entry.snapshot().getSnapshotId().getName())) { + states.add(entry.state()); + } + } + }; + + try { + // Record the upcoming states of the snapshot on all nodes + internalCluster().getInstances(ClusterService.class).forEach(clusterService -> clusterService.addListener(listener)); + + // Delete the snapshot while it is being initialized + ActionFuture delete = client.admin().cluster().prepareDeleteSnapshot("repository", "snap").execute(); + + // The deletion must set the snapshot in the ABORTED state + assertBusy(() -> { + SnapshotsStatusResponse status = client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get(); + assertThat(status.getSnapshots().iterator().next().getState(), equalTo(State.ABORTED)); + }); + + // Now unblock the repository + unblockNode("repository", internalCluster().getMasterName()); + blocked = false; + + assertAcked(delete.get()); + expectThrows(SnapshotMissingException.class, () -> + client.admin().cluster().prepareGetSnapshots("repository").setSnapshots("snap").get()); + + assertFalse("Expecting snapshot state to be updated", states.isEmpty()); + assertFalse("Expecting snapshot to be aborted and not started at all", states.contains(State.STARTED)); + } finally { + internalCluster().getInstances(ClusterService.class).forEach(clusterService -> clusterService.removeListener(listener)); + if (blocked) { + unblockNode("repository", internalCluster().getMasterName()); + } + } + } + private void verifySnapshotInfo(final GetSnapshotsResponse response, final Map> indicesPerSnapshot) { for (SnapshotInfo snapshotInfo : response.getSnapshots()) { final List expected = snapshotInfo.indices();