diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java index 324cb3712adf1..1b7ead5b96510 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java @@ -22,7 +22,6 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.shard.ShardId; @@ -49,13 +48,13 @@ private SnapshotIndexShardStatus() { this.stats = new SnapshotStats(); } - SnapshotIndexShardStatus(ShardId shardId, IndexShardSnapshotStatus indexShardStatus) { + SnapshotIndexShardStatus(ShardId shardId, IndexShardSnapshotStatus.Copy indexShardStatus) { this(shardId, indexShardStatus, null); } - SnapshotIndexShardStatus(ShardId shardId, IndexShardSnapshotStatus indexShardStatus, String nodeId) { + SnapshotIndexShardStatus(ShardId shardId, IndexShardSnapshotStatus.Copy indexShardStatus, String nodeId) { super(shardId); - switch (indexShardStatus.stage()) { + switch (indexShardStatus.getStage()) { case INIT: stage = SnapshotIndexShardStage.INIT; break; @@ -72,10 +71,12 @@ private SnapshotIndexShardStatus() { stage = SnapshotIndexShardStage.FAILURE; break; default: - throw new IllegalArgumentException("Unknown stage type " + indexShardStatus.stage()); + throw new IllegalArgumentException("Unknown stage type " + indexShardStatus.getStage()); } - stats = new SnapshotStats(indexShardStatus); - failure = indexShardStatus.failure(); + this.stats = new SnapshotStats(indexShardStatus.getStartTime(), indexShardStatus.getTotalTime(), + indexShardStatus.getNumberOfFiles(), indexShardStatus.getProcessedFiles(), + indexShardStatus.getTotalSize(), indexShardStatus.getProcessedSize()); + this.failure = indexShardStatus.getFailure(); this.nodeId = nodeId; } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java index ba11e51d56f87..5b2bdd7c614c6 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java @@ -25,33 +25,28 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import java.io.IOException; public class SnapshotStats implements Streamable, ToXContentFragment { - private long startTime; + private long startTime; private long time; - private int numberOfFiles; - private int processedFiles; - private long totalSize; - private long processedSize; SnapshotStats() { } - SnapshotStats(IndexShardSnapshotStatus indexShardStatus) { - startTime = indexShardStatus.startTime(); - time = indexShardStatus.time(); - numberOfFiles = indexShardStatus.numberOfFiles(); - processedFiles = indexShardStatus.processedFiles(); - totalSize = indexShardStatus.totalSize(); - processedSize = indexShardStatus.processedSize(); + SnapshotStats(long startTime, long time, int numberOfFiles, int processedFiles, long totalSize, long processedSize) { + this.startTime = startTime; + this.time = time; + this.numberOfFiles = numberOfFiles; + this.processedFiles = processedFiles; + this.totalSize = totalSize; + this.processedSize = processedSize; } /** diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java index 872793f6ef21a..77578546b9585 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java @@ -96,7 +96,7 @@ protected NodesSnapshotStatus newResponse(Request request, List> snapshotMapBuilder = new HashMap<>(); try { - String nodeId = clusterService.localNode().getId(); + final String nodeId = clusterService.localNode().getId(); for (Snapshot snapshot : request.snapshots) { Map shardsStatus = snapshotShardsService.currentSnapshotShards(snapshot); if (shardsStatus == null) { @@ -104,15 +104,17 @@ protected NodeSnapshotStatus nodeOperation(NodeRequest request) { } Map shardMapBuilder = new HashMap<>(); for (Map.Entry shardEntry : shardsStatus.entrySet()) { - SnapshotIndexShardStatus shardStatus; - IndexShardSnapshotStatus.Stage stage = shardEntry.getValue().stage(); + final ShardId shardId = shardEntry.getKey(); + + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = shardEntry.getValue().asCopy(); + final IndexShardSnapshotStatus.Stage stage = lastSnapshotStatus.getStage(); + + String shardNodeId = null; if (stage != IndexShardSnapshotStatus.Stage.DONE && stage != IndexShardSnapshotStatus.Stage.FAILURE) { // Store node id for the snapshots that are currently running. - shardStatus = new SnapshotIndexShardStatus(shardEntry.getKey(), shardEntry.getValue(), nodeId); - } else { - shardStatus = new SnapshotIndexShardStatus(shardEntry.getKey(), shardEntry.getValue()); + shardNodeId = nodeId; } - shardMapBuilder.put(shardEntry.getKey(), shardStatus); + shardMapBuilder.put(shardEntry.getKey(), new SnapshotIndexShardStatus(shardId, lastSnapshotStatus, shardNodeId)); } snapshotMapBuilder.put(snapshot, unmodifiableMap(shardMapBuilder)); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 71bb1995dd57e..dc13c8dab5188 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -233,7 +233,8 @@ private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, Li Map shardStatues = snapshotsService.snapshotShards(request.repository(), snapshotInfo); for (Map.Entry shardStatus : shardStatues.entrySet()) { - shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), shardStatus.getValue())); + IndexShardSnapshotStatus.Copy lastSnapshotStatus = shardStatus.getValue().asCopy(); + shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), lastSnapshotStatus)); } final SnapshotsInProgress.State state; switch (snapshotInfo.state()) { diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java index 644caa7520be5..f1c247a41bb6d 100644 --- a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java +++ b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java @@ -19,6 +19,9 @@ package org.elasticsearch.index.snapshots; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + /** * Represent shard snapshot status */ @@ -47,119 +50,85 @@ public enum Stage { /** * Snapshot failed */ - FAILURE + FAILURE, + /** + * Snapshot aborted + */ + ABORTED } - private Stage stage = Stage.INIT; - + private final AtomicReference stage; private long startTime; - - private long time; - + private long totalTime; private int numberOfFiles; - - private volatile int processedFiles; - + private int processedFiles; private long totalSize; - - private volatile long processedSize; - + private long processedSize; private long indexVersion; - - private volatile boolean aborted; - private String failure; - /** - * Returns current snapshot stage - * - * @return current snapshot stage - */ - public Stage stage() { - return this.stage; - } - - /** - * Sets new snapshot stage - * - * @param stage new snapshot stage - */ - public void updateStage(Stage stage) { - this.stage = stage; - } - - /** - * Returns snapshot start time - * - * @return snapshot start time - */ - public long startTime() { - return this.startTime; - } - - /** - * Sets snapshot start time - * - * @param startTime snapshot start time - */ - public void startTime(long startTime) { + private IndexShardSnapshotStatus(final Stage stage, final long startTime, final long totalTime, + final int numberOfFiles, final int processedFiles, final long totalSize, final long processedSize, + final long indexVersion, final String failure) { + this.stage = new AtomicReference<>(Objects.requireNonNull(stage)); this.startTime = startTime; + this.totalTime = totalTime; + this.numberOfFiles = numberOfFiles; + this.processedFiles = processedFiles; + this.totalSize = totalSize; + this.processedSize = processedSize; + this.indexVersion = indexVersion; + this.failure = failure; } - /** - * Returns snapshot processing time - * - * @return processing time - */ - public long time() { - return this.time; + public synchronized Copy moveToStarted(final long startTime, final int numberOfFiles, final long totalSize) { + if (stage.compareAndSet(Stage.INIT, Stage.STARTED)) { + this.startTime = startTime; + this.numberOfFiles = numberOfFiles; + this.totalSize = totalSize; + } else { + throw new IllegalStateException("Unable to move the shard snapshot status to [STARTED]: " + + "expecting [INIT] but got [" + stage.get() + "]"); + } + return asCopy(); } - /** - * Sets snapshot processing time - * - * @param time snapshot processing time - */ - public void time(long time) { - this.time = time; + public synchronized Copy moveToFinalize(final long indexVersion) { + if (stage.compareAndSet(Stage.STARTED, Stage.FINALIZE)) { + this.indexVersion = indexVersion; + } else { + throw new IllegalStateException("Unable to move the shard snapshot status to [FINALIZE]: " + + "expecting [STARTED] but got [" + stage.get() + "]"); + } + return asCopy(); } - /** - * Returns true if snapshot process was aborted - * - * @return true if snapshot process was aborted - */ - public boolean aborted() { - return this.aborted; + public synchronized Copy moveToDone(final long endTime) { + if (stage.compareAndSet(Stage.FINALIZE, Stage.DONE)) { + this.totalTime = Math.max(0L, endTime - startTime); + } else { + throw new IllegalStateException("Unable to move the shard snapshot status to [DONE]: " + + "expecting [FINALIZE] but got [" + stage.get() + "]"); + } + return asCopy(); } - /** - * Marks snapshot as aborted - */ - public void abort() { - this.aborted = true; + public synchronized Copy abortIfNotCompleted(final String failure) { + if (stage.compareAndSet(Stage.INIT, Stage.ABORTED) || stage.compareAndSet(Stage.STARTED, Stage.ABORTED)) { + this.failure = failure; + } + return asCopy(); } - /** - * Sets files stats - * - * @param numberOfFiles number of files in this snapshot - * @param totalSize total size of files in this snapshot - */ - public void files(int numberOfFiles, long totalSize) { - this.numberOfFiles = numberOfFiles; - this.totalSize = totalSize; + public synchronized void moveToFailed(final long endTime, final String failure) { + if (stage.getAndSet(Stage.FAILURE) != Stage.FAILURE) { + this.totalTime = Math.max(0L, endTime - startTime); + this.failure = failure; + } } - /** - * Sets processed files stats - * - * @param numberOfFiles number of files in this snapshot - * @param totalSize total size of files in this snapshot - */ - public synchronized void processedFiles(int numberOfFiles, long totalSize) { - processedFiles = numberOfFiles; - processedSize = totalSize; + public boolean isAborted() { + return stage.get() == Stage.ABORTED; } /** @@ -171,71 +140,111 @@ public synchronized void addProcessedFile(long size) { } /** - * Number of files - * - * @return number of files - */ - public int numberOfFiles() { - return numberOfFiles; - } - - /** - * Total snapshot size - * - * @return snapshot size - */ - public long totalSize() { - return totalSize; - } - - /** - * Number of processed files - * - * @return number of processed files - */ - public int processedFiles() { - return processedFiles; - } - - /** - * Size of processed files + * Returns a copy of the current {@link IndexShardSnapshotStatus}. This method is + * intended to be used when a coherent state of {@link IndexShardSnapshotStatus} is needed. * - * @return size of processed files - */ - public long processedSize() { - return processedSize; - } - - - /** - * Sets index version - * - * @param indexVersion index version - */ - public void indexVersion(long indexVersion) { - this.indexVersion = indexVersion; - } - - /** - * Returns index version - * - * @return index version - */ - public long indexVersion() { - return indexVersion; - } - - /** - * Sets the reason for the failure if the snapshot is in the {@link IndexShardSnapshotStatus.Stage#FAILURE} state - */ - public void failure(String failure) { - this.failure = failure; - } - - /** - * Returns the reason for the failure if the snapshot is in the {@link IndexShardSnapshotStatus.Stage#FAILURE} state - */ - public String failure() { - return failure; + * @return a {@link IndexShardSnapshotStatus.Copy} + */ + public synchronized IndexShardSnapshotStatus.Copy asCopy() { + return new IndexShardSnapshotStatus.Copy(stage.get(), startTime, totalTime, numberOfFiles, processedFiles, totalSize, processedSize, + indexVersion, failure); + } + + public static IndexShardSnapshotStatus newInitializing() { + return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, null); + } + + public static IndexShardSnapshotStatus newFailed(final String failure) { + assert failure != null : "expecting non null failure for a failed IndexShardSnapshotStatus"; + if (failure == null) { + throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus"); + } + return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, failure); + } + + public static IndexShardSnapshotStatus newDone(final long startTime, final long totalTime, final int files, final long size) { + // The snapshot is done which means the number of processed files is the same as total + return new IndexShardSnapshotStatus(Stage.DONE, startTime, totalTime, files, files, size, size, 0, null); + } + + /** + * Returns an immutable state of {@link IndexShardSnapshotStatus} at a given point in time. + */ + public static class Copy { + + private final Stage stage; + private final long startTime; + private final long totalTime; + private final int numberOfFiles; + private final int processedFiles; + private final long totalSize; + private final long processedSize; + private final long indexVersion; + private final String failure; + + public Copy(final Stage stage, final long startTime, final long totalTime, + final int numberOfFiles, final int processedFiles, final long totalSize, final long processedSize, + final long indexVersion, final String failure) { + this.stage = stage; + this.startTime = startTime; + this.totalTime = totalTime; + this.numberOfFiles = numberOfFiles; + this.processedFiles = processedFiles; + this.totalSize = totalSize; + this.processedSize = processedSize; + this.indexVersion = indexVersion; + this.failure = failure; + } + + public Stage getStage() { + return stage; + } + + public long getStartTime() { + return startTime; + } + + public long getTotalTime() { + return totalTime; + } + + public int getNumberOfFiles() { + return numberOfFiles; + } + + public int getProcessedFiles() { + return processedFiles; + } + + public long getTotalSize() { + return totalSize; + } + + public long getProcessedSize() { + return processedSize; + } + + public long getIndexVersion() { + return indexVersion; + } + + public String getFailure() { + return failure; + } + + @Override + public String toString() { + return "index shard snapshot status (" + + "stage=" + stage + + ", startTime=" + startTime + + ", totalTime=" + totalTime + + ", numberOfFiles=" + numberOfFiles + + ", processedFiles=" + processedFiles + + ", totalSize=" + totalSize + + ", processedSize=" + processedSize + + ", indexVersion=" + indexVersion + + ", failure='" + failure + '\'' + + ')'; + } } } diff --git a/core/src/main/java/org/elasticsearch/repositories/Repository.java b/core/src/main/java/org/elasticsearch/repositories/Repository.java index f711a72b67757..4c3d58e67ff72 100644 --- a/core/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/core/src/main/java/org/elasticsearch/repositories/Repository.java @@ -180,7 +180,7 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. *

* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check - * {@link IndexShardSnapshotStatus#aborted()} to see if the snapshot process should be aborted. + * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted. * * @param shard shard to be snapshotted * @param snapshotId snapshot id diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 06812be5aab2c..9068c6ff39743 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -805,17 +805,11 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef) t @Override public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { - SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, indexId, snapshotStatus); - snapshotStatus.startTime(System.currentTimeMillis()); - + SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, indexId, snapshotStatus, System.currentTimeMillis()); try { snapshotContext.snapshot(snapshotIndexCommit); - snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime()); - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); } catch (Exception e) { - snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime()); - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE); - snapshotStatus.failure(ExceptionsHelper.detailedMessage(e)); + snapshotStatus.moveToFailed(System.currentTimeMillis(), ExceptionsHelper.detailedMessage(e)); if (e instanceof IndexShardSnapshotFailedException) { throw (IndexShardSnapshotFailedException) e; } else { @@ -838,14 +832,7 @@ public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version versio public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { Context context = new Context(snapshotId, version, indexId, shardId); BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); - IndexShardSnapshotStatus status = new IndexShardSnapshotStatus(); - status.updateStage(IndexShardSnapshotStatus.Stage.DONE); - status.startTime(snapshot.startTime()); - status.files(snapshot.numberOfFiles(), snapshot.totalSize()); - // The snapshot is done which means the number of processed files is the same as total - status.processedFiles(snapshot.numberOfFiles(), snapshot.totalSize()); - status.time(snapshot.time()); - return status; + return IndexShardSnapshotStatus.newDone(snapshot.startTime(), snapshot.time(), snapshot.numberOfFiles(), snapshot.totalSize()); } @Override @@ -1103,8 +1090,8 @@ protected Tuple buildBlobStoreIndexShardS private class SnapshotContext extends Context { private final Store store; - private final IndexShardSnapshotStatus snapshotStatus; + private final long startTime; /** * Constructs new context @@ -1114,10 +1101,11 @@ private class SnapshotContext extends Context { * @param indexId the id of the index being snapshotted * @param snapshotStatus snapshot status to report progress */ - SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus) { + SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime) { super(snapshotId, Version.CURRENT, indexId, shard.shardId()); this.snapshotStatus = snapshotStatus; this.store = shard.store(); + this.startTime = startTime; } /** @@ -1125,24 +1113,25 @@ private class SnapshotContext extends Context { * * @param snapshotIndexCommit snapshot commit point */ - public void snapshot(IndexCommit snapshotIndexCommit) { + public void snapshot(final IndexCommit snapshotIndexCommit) { logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name()); - store.incRef(); + + final Map blobs; try { - final Map blobs; - try { - blobs = blobContainer.listBlobs(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e); - } + blobs = blobContainer.listBlobs(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e); + } - long generation = findLatestFileNameGeneration(blobs); - Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); - BlobStoreIndexShardSnapshots snapshots = tuple.v1(); - int fileListGeneration = tuple.v2(); + long generation = findLatestFileNameGeneration(blobs); + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); + BlobStoreIndexShardSnapshots snapshots = tuple.v1(); + int fileListGeneration = tuple.v2(); - final List indexCommitPointFiles = new ArrayList<>(); + final List indexCommitPointFiles = new ArrayList<>(); + store.incRef(); + try { int indexNumberOfFiles = 0; long indexTotalFilesSize = 0; ArrayList filesToSnapshot = new ArrayList<>(); @@ -1156,10 +1145,11 @@ public void snapshot(IndexCommit snapshotIndexCommit) { throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); } for (String fileName : fileNames) { - if (snapshotStatus.aborted()) { + if (snapshotStatus.isAborted()) { logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); throw new IndexShardSnapshotFailedException(shardId, "Aborted"); } + logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); final StoreFileMetaData md = metadata.get(fileName); BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null; @@ -1195,14 +1185,7 @@ public void snapshot(IndexCommit snapshotIndexCommit) { } } - snapshotStatus.files(indexNumberOfFiles, indexTotalFilesSize); - - if (snapshotStatus.aborted()) { - logger.debug("[{}] [{}] Aborted during initialization", shardId, snapshotId); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } - - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED); + snapshotStatus.moveToStarted(startTime, indexNumberOfFiles, indexTotalFilesSize); for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { try { @@ -1211,36 +1194,42 @@ public void snapshot(IndexCommit snapshotIndexCommit) { throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); } } - - snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration()); - // now create and write the commit point - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE); - - BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), - snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(), - // snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong - System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); - //TODO: The time stored in snapshot doesn't include cleanup time. - logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); - try { - indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getUUID()); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); - } - - // delete all files that are not referenced by any commit point - // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones - List newSnapshotsList = new ArrayList<>(); - newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); - for (SnapshotFiles point : snapshots) { - newSnapshotsList.add(point); - } - // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index - finalize(newSnapshotsList, fileListGeneration + 1, blobs); - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); } finally { store.decRef(); } + + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); + + // now create and write the commit point + final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), + lastSnapshotStatus.getIndexVersion(), + indexCommitPointFiles, + lastSnapshotStatus.getStartTime(), + // snapshotStatus.startTime() is assigned on the same machine, + // so it's safe to use with VLong + System.currentTimeMillis() - lastSnapshotStatus.getStartTime(), + lastSnapshotStatus.getNumberOfFiles(), + lastSnapshotStatus.getTotalSize()); + + //TODO: The time stored in snapshot doesn't include cleanup time. + logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); + try { + indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getUUID()); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); + } + + // delete all files that are not referenced by any commit point + // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones + List newSnapshotsList = new ArrayList<>(); + newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); + for (SnapshotFiles point : snapshots) { + newSnapshotsList.add(point); + } + // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index + finalize(newSnapshotsList, fileListGeneration + 1, blobs); + snapshotStatus.moveToDone(System.currentTimeMillis()); + } /** @@ -1335,7 +1324,7 @@ public int read(byte[] b, int off, int len) throws IOException { } private void checkAborted() { - if (snapshotStatus.aborted()) { + if (snapshotStatus.isAborted()) { logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); throw new IndexShardSnapshotFailedException(shardId, "Aborted"); } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 248f9a555a3d6..35e0b10fd8769 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -51,7 +51,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.engine.Engine; @@ -188,7 +187,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh Map shards = snapshotShards.getValue().shards; if (shards.containsKey(shardId)) { logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", shardId, snapshotShards.getKey().getSnapshotId()); - shards.get(shardId).abort(); + shards.get(shardId).abortIfNotCompleted("shard is closing, aborting"); } } } @@ -230,9 +229,7 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { // running shards is missed, then the snapshot is removed is a subsequent cluster // state update, which is being processed here for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().shards.values()) { - if (snapshotStatus.stage() == Stage.INIT || snapshotStatus.stage() == Stage.STARTED) { - snapshotStatus.abort(); - } + snapshotStatus.abortIfNotCompleted("snapshot has been removed in cluster state, aborting"); } } } @@ -255,7 +252,7 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { if (localNodeId.equals(shard.value.nodeId())) { if (shard.value.state() == State.INIT && (snapshotShards == null || !snapshotShards.shards.containsKey(shard.key))) { logger.trace("[{}] - Adding shard to the queue", shard.key); - startedShards.put(shard.key, new IndexShardSnapshotStatus()); + startedShards.put(shard.key, IndexShardSnapshotStatus.newInitializing()); } } } @@ -278,30 +275,26 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { // Abort all running shards for this snapshot SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot()); if (snapshotShards != null) { + final String failure = "snapshot has been aborted"; for (ObjectObjectCursor shard : entry.shards()) { - IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.key); + + final IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.key); if (snapshotStatus != null) { - switch (snapshotStatus.stage()) { - case INIT: - case STARTED: - snapshotStatus.abort(); - break; - case FINALIZE: - logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + - "letting it finish", entry.snapshot(), shard.key); - break; - case DONE: - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + - "updating status on the master", entry.snapshot(), shard.key); - notifySuccessfulSnapshotShard(entry.snapshot(), shard.key, localNodeId); - break; - case FAILURE: - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + - "updating status on the master", entry.snapshot(), shard.key); - notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, snapshotStatus.failure()); - break; - default: - throw new IllegalStateException("Unknown snapshot shard stage " + snapshotStatus.stage()); + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.abortIfNotCompleted(failure); + final Stage stage = lastSnapshotStatus.getStage(); + if (stage == Stage.FINALIZE) { + logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + + "letting it finish", entry.snapshot(), shard.key); + + } else if (stage == Stage.DONE) { + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + + "updating status on the master", entry.snapshot(), shard.key); + notifySuccessfulSnapshotShard(entry.snapshot(), shard.key, localNodeId); + + } else if (stage == Stage.FAILURE) { + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + + "updating status on the master", entry.snapshot(), shard.key); + notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, lastSnapshotStatus.getFailure()); } } } @@ -400,12 +393,8 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) { repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus); if (logger.isDebugEnabled()) { - StringBuilder details = new StringBuilder(); - details.append(" index : version [").append(snapshotStatus.indexVersion()); - details.append("], number_of_files [").append(snapshotStatus.numberOfFiles()); - details.append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n"); - logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, repository, - TimeValue.timeValueMillis(snapshotStatus.time()), details); + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + logger.debug("snapshot ({}) completed to {} with {}", snapshot, repository, lastSnapshotStatus); } } } catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e) { @@ -432,21 +421,22 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { ImmutableOpenMap masterShards = snapshot.shards(); for(Map.Entry localShard : localShards.entrySet()) { ShardId shardId = localShard.getKey(); - IndexShardSnapshotStatus localShardStatus = localShard.getValue(); ShardSnapshotStatus masterShard = masterShards.get(shardId); if (masterShard != null && masterShard.state().completed() == false) { + final IndexShardSnapshotStatus.Copy indexShardSnapshotStatus = localShard.getValue().asCopy(); + final Stage stage = indexShardSnapshotStatus.getStage(); // Master knows about the shard and thinks it has not completed - if (localShardStatus.stage() == Stage.DONE) { + if (stage == Stage.DONE) { // but we think the shard is done - we need to make new master know that the shard is done logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " + "updating status on the master", snapshot.snapshot(), shardId); notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localNodeId); - } else if (localShard.getValue().stage() == Stage.FAILURE) { + } else if (stage == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, " + "updating status on master", snapshot.snapshot(), shardId); - notifyFailedSnapshotShard(snapshot.snapshot(), shardId, localNodeId, localShardStatus.failure()); + notifyFailedSnapshotShard(snapshot.snapshot(), shardId, localNodeId, indexShardSnapshotStatus.getFailure()); } } } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index e19394714731f..ef999fe9d0045 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -598,10 +598,7 @@ public Map snapshotShards(final String reposi ShardId shardId = new ShardId(indexMetaData.getIndex(), i); SnapshotShardFailure shardFailure = findShardFailure(snapshotInfo.shardFailures(), shardId); if (shardFailure != null) { - IndexShardSnapshotStatus shardSnapshotStatus = new IndexShardSnapshotStatus(); - shardSnapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE); - shardSnapshotStatus.failure(shardFailure.reason()); - shardStatus.put(shardId, shardSnapshotStatus); + shardStatus.put(shardId, IndexShardSnapshotStatus.newFailed(shardFailure.reason())); } else { final IndexShardSnapshotStatus shardSnapshotStatus; if (snapshotInfo.state() == SnapshotState.FAILED) { @@ -612,9 +609,7 @@ public Map snapshotShards(final String reposi // snapshot status will throw an exception. Instead, we create // a status for the shard to indicate that the shard snapshot // could not be taken due to partial being set to false. - shardSnapshotStatus = new IndexShardSnapshotStatus(); - shardSnapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE); - shardSnapshotStatus.failure("skipped"); + shardSnapshotStatus = IndexShardSnapshotStatus.newFailed("skipped"); } else { shardSnapshotStatus = repository.getShardSnapshotStatus( snapshotInfo.snapshotId(), diff --git a/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java b/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java index 651cd96776e75..8431c8fa69f54 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java @@ -93,7 +93,7 @@ public void testRetryPostingSnapshotStatusMessages() throws Exception { assertBusy(() -> { final Snapshot snapshot = new Snapshot("test-repo", snapshotId); List stages = snapshotShardsService.currentSnapshotShards(snapshot) - .values().stream().map(IndexShardSnapshotStatus::stage).collect(Collectors.toList()); + .values().stream().map(status -> status.asCopy().getStage()).collect(Collectors.toList()); assertThat(stages, hasSize(shards)); assertThat(stages, everyItem(equalTo(IndexShardSnapshotStatus.Stage.DONE))); }); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index c06b4a433cb0a..4737befa30e45 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -619,16 +619,18 @@ protected void recoverShardFromSnapshot(final IndexShard shard, protected void snapshotShard(final IndexShard shard, final Snapshot snapshot, final Repository repository) throws IOException { - final IndexShardSnapshotStatus snapshotStatus = new IndexShardSnapshotStatus(); + final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(); try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(true)) { Index index = shard.shardId().getIndex(); IndexId indexId = new IndexId(index.getName(), index.getUUID()); repository.snapshotShard(shard, snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(), snapshotStatus); } - assertEquals(IndexShardSnapshotStatus.Stage.DONE, snapshotStatus.stage()); - assertEquals(shard.snapshotStoreMetadata().size(), snapshotStatus.numberOfFiles()); - assertNull(snapshotStatus.failure()); + + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + assertEquals(IndexShardSnapshotStatus.Stage.DONE, lastSnapshotStatus.getStage()); + assertEquals(shard.snapshotStoreMetadata().size(), lastSnapshotStatus.getNumberOfFiles()); + assertNull(lastSnapshotStatus.getFailure()); } /**