Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make snapshot deletion faster #147

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -929,23 +929,25 @@ private void cleanupStaleBlobs(Collection<SnapshotId> deletedSnapshots, Map<Stri
listener.onResponse(deleteResult);
}, listener::onFailure), 2);

final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
if (staleRootBlobs.isEmpty()) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(groupedListener, () -> {
List<String> deletedBlobs =
cleanupStaleRootFiles(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs);
return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum());
}));
}

final Set<String> survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
if (foundIndices.keySet().equals(survivingIndexIds)) {
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete = foundIndices.entrySet().stream()
.filter(foundIndexEntry -> !survivingIndexIds.contains(foundIndexEntry.getKey()))
.collect(Collectors.toCollection(LinkedBlockingQueue::new));
if (staleIndicesToDelete.isEmpty()) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
cleanupStaleIndices(groupedListener, staleIndicesToDelete);
}
}

Expand Down Expand Up @@ -1053,31 +1055,57 @@ private List<String> cleanupStaleRootFiles(long previousGeneration, Collection<S
return Collections.emptyList();
}

private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds) {
DeleteResult deleteResult = DeleteResult.ZERO;
private void cleanupStaleIndices(GroupedActionListener<DeleteResult> listener,
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete) {
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> {
DeleteResult deleteResult = DeleteResult.ZERO;
for (DeleteResult result : deleteResults) {
deleteResult = deleteResult.add(result);
}
listener.onResponse(deleteResult);
}, listener::onFailure), staleIndicesToDelete.size());

try {
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
final String indexSnId = indexEntry.getKey();
try {
if (survivingIndexIds.contains(indexSnId) == false) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = deleteResult.add(indexEntry.getValue().delete());
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
}
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
}
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleIndicesToDelete.size());
for (int i = 0; i < workers; ++i) {
executeOneStaleIndexDelete(groupedListener, staleIndicesToDelete);
}
} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
// bubbling up and breaking the snapshot functionality.
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e);
listener.onResponse(DeleteResult.ZERO);
}
}

private void executeOneStaleIndexDelete(GroupedActionListener<DeleteResult> listener,
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete)
throws InterruptedException {
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
if (indexEntry != null) {
final String indexSnId = indexEntry.getKey();
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
DeleteResult deleteResult = DeleteResult.ZERO;
try {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = indexEntry.getValue().delete();
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
} catch (Exception e) {
assert false : e;
logger.warn(() ->
new ParameterizedMessage("[{}] Exception during cleanup of stale index [{}]", metadata.name(), indexSnId), e);
}

executeOneStaleIndexDelete(listener, staleIndicesToDelete);
return deleteResult;
}));
}
return deleteResult;
}

@Override
Expand Down