Skip to content

Commit

Permalink
Make snapshot deletion faster
Browse files Browse the repository at this point in the history
The delete snapshot task takes longer than expected. A major reason for this is
that the (often many) stale indices are deleted iteratively.
In this commit we change the deletion to be concurrent using the SNAPSHOT threadpool.
Notice that in order to avoid putting too many delete tasks on the threadpool
queue a similar methodology was used as in `executeOneFileSnapshot()`. This is due to
 the fact that the threadpool should allow other tasks to use this threadpool without
too much of a delay.

fixes issue elastic#61513 from Elasticsearch project
  • Loading branch information
AmiStrn committed Feb 18, 2021
1 parent 747e1cc commit c6adca6
Showing 1 changed file with 44 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -916,23 +916,25 @@ private void cleanupStaleBlobs(Collection<SnapshotId> deletedSnapshots, Map<Stri
listener.onResponse(deleteResult);
}, listener::onFailure), 2);

final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
if (staleRootBlobs.isEmpty()) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(groupedListener, () -> {
List<String> deletedBlobs =
cleanupStaleRootFiles(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs);
return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum());
}));
}

final Set<String> survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
if (foundIndices.keySet().equals(survivingIndexIds)) {
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete = foundIndices.entrySet().stream()
.filter(foundIndexEntry -> !survivingIndexIds.contains(foundIndexEntry.getKey()))
.collect(Collectors.toCollection(LinkedBlockingQueue::new));
if (staleIndicesToDelete.isEmpty()) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
cleanupStaleIndices(groupedListener, staleIndicesToDelete);
}
}

Expand Down Expand Up @@ -1040,31 +1042,53 @@ private List<String> cleanupStaleRootFiles(long previousGeneration, Collection<S
return Collections.emptyList();
}

private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds) {
DeleteResult deleteResult = DeleteResult.ZERO;
private void cleanupStaleIndices(GroupedActionListener<DeleteResult> listener, BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete) {
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> {
DeleteResult deleteResult = DeleteResult.ZERO;
for (DeleteResult result : deleteResults) {
deleteResult = deleteResult.add(result);
}
listener.onResponse(deleteResult);
}, listener::onFailure), staleIndicesToDelete.size());

try {
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
final String indexSnId = indexEntry.getKey();
try {
if (survivingIndexIds.contains(indexSnId) == false) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = deleteResult.add(indexEntry.getValue().delete());
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
}
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
}
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleIndicesToDelete.size());
for (int i = 0; i < workers; ++i) {
executeOneStaleIndexDelete(groupedListener, staleIndicesToDelete);
}
} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
// bubbling up and breaking the snapshot functionality.
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e);
listener.onResponse(DeleteResult.ZERO);
}
}

private void executeOneStaleIndexDelete(GroupedActionListener<DeleteResult> listener, BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete) throws InterruptedException {
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
if (indexEntry != null) {
final String indexSnId = indexEntry.getKey();
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
DeleteResult deleteResult = DeleteResult.ZERO;
try {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = indexEntry.getValue().delete();
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
} catch (Exception e) {
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale index [{}]", metadata.name(), indexSnId), e);
}

executeOneStaleIndexDelete(listener, staleIndicesToDelete);
return deleteResult;
}));
}
return deleteResult;
}

@Override
Expand Down

0 comments on commit c6adca6

Please sign in to comment.