Skip to content

Commit

Permalink
Parallelize stale index deletion
Browse files Browse the repository at this point in the history
After deleting a snapshot today we clean up all the now-dangling indices
sequentially, which can be rather slow. With this commit we parallelize
the work across the whole `SNAPSHOT` pool on the master node.

Closes elastic#61513

Co-authored-by: Piyush Daftary <pdaftary@amazon.com>
  • Loading branch information
DaveCTurner and piyushdaftary committed Oct 5, 2023
1 parent 9f72ce0 commit 67867ce
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryConflictException;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;

import java.nio.file.Path;
Expand Down Expand Up @@ -295,4 +297,58 @@ public void testRepositoryConflict() throws Exception {
logger.info("--> wait until snapshot deletion is finished");
assertAcked(future.actionGet());
}

public void testLeakedStaleIndicesAreDeletedBySubsequentDelete() throws Exception {
Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String snapshot1Name = "test-snap-1";
final String snapshot2Name = "test-snap-2";

logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
createRepository(repositoryName, "mock", repositoryPath);

logger.info("--> creating index-1 and ingest data");
createIndex("test-idx-1");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-1", Integer.toString(10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, snapshot1Name);

logger.info("--> creating index-2 and ingest data");
createIndex("test-idx-2");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-2", Integer.toString(10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, snapshot2Name);

// Make repository throw exceptions when trying to delete stale indices
// This will make sure stale indices stay in repository after snapshot delete
final var repository = (MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class)
.repository(repositoryName);
repository.setFailOnDelete(true);

logger.info("--> delete the second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get();

// Make repository work normally
repository.setFailOnDelete(false);

// This snapshot should delete last snapshot's residual stale indices as well
logger.info("--> delete snapshot one");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get();

logger.info("--> check no leftover files");
assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs

logger.info("--> done");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ public DeleteResult add(DeleteResult other) {
public DeleteResult add(long blobs, long bytes) {
return new DeleteResult(blobsDeleted + blobs, bytesDeleted + bytes);
}

public static DeleteResult of(long blobs, long bytes) {
if (blobs == 0 && bytes == 0) {
return ZERO;
} else {
return new DeleteResult(blobs, bytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.core.CheckedConsumer;
Expand Down Expand Up @@ -390,6 +391,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final ShardSnapshotTaskRunner shardSnapshotTaskRunner;

private final ThrottledTaskRunner staleBlobDeleteRunner;

/**
* Constructs new BlobStoreRepository
* @param metadata The metadata for this repository including name and settings
Expand Down Expand Up @@ -430,6 +433,11 @@ protected BlobStoreRepository(
this::doSnapshotShard,
this::snapshotFile
);
staleBlobDeleteRunner = new ThrottledTaskRunner(
"cleanupStaleBlobs",
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
threadPool.executor(ThreadPool.Names.SNAPSHOT)
);
}

@Override
Expand Down Expand Up @@ -1138,30 +1146,53 @@ private void cleanupStaleBlobs(
RepositoryData newRepoData,
ActionListener<DeleteResult> listener
) {
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(2, ActionListener.wrap(deleteResults -> {
DeleteResult deleteResult = DeleteResult.ZERO;
for (DeleteResult result : deleteResults) {
deleteResult = deleteResult.add(result);
final var blobsDeleted = new AtomicLong();
final var bytesDeleted = new AtomicLong();
try (var listeners = new RefCountingListener(listener.map(ignored -> DeleteResult.of(blobsDeleted.get(), bytesDeleted.get())))) {

final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
if (staleRootBlobs.isEmpty() == false) {
staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> {
try (ref) {
logStaleRootLevelBlobs(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs);
deleteFromContainer(blobContainer(), staleRootBlobs.iterator());
for (final var staleRootBlob : staleRootBlobs) {
bytesDeleted.addAndGet(rootBlobs.get(staleRootBlob).length());
}
blobsDeleted.addAndGet(staleRootBlobs.size());
} catch (Exception e) {
logger.warn(
() -> format(
"[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them",
metadata.name(),
staleRootBlobs
),
e
);
}
}));
}
listener.onResponse(deleteResult);
}, listener::onFailure));

final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
if (staleRootBlobs.isEmpty()) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> {
List<String> deletedBlobs = cleanupStaleRootFiles(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs);
return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum());
}));
}

final Set<String> survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
if (foundIndices.keySet().equals(survivingIndexIds)) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
final var survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
for (final var indexEntry : foundIndices.entrySet()) {
final var indexSnId = indexEntry.getKey();
if (survivingIndexIds.contains(indexSnId)) {
continue;
}
staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> {
try (ref) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
final var deleteResult = indexEntry.getValue().delete(OperationPurpose.SNAPSHOT);
blobsDeleted.addAndGet(deleteResult.blobsDeleted());
bytesDeleted.addAndGet(deleteResult.bytesDeleted());
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
} catch (IOException e) {
logger.warn(() -> format("""
[%s] index %s is no longer part of any snapshot in the repository, \
but failed to clean up its index folder""", metadata.name(), indexSnId), e);
}
}));
}
}
}

Expand All @@ -1171,8 +1202,8 @@ private void cleanupStaleBlobs(
* TODO: Add shard level cleanups
* TODO: Add unreferenced index metadata cleanup
* <ul>
* <li>Deleting stale indices {@link #cleanupStaleIndices}</li>
* <li>Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}</li>
* <li>Deleting stale indices</li>
* <li>Deleting unreferenced root level blobs</li>
* </ul>
* @param repositoryStateId Current repository state id
* @param repositoryMetaVersion version of the updated repository metadata to write
Expand Down Expand Up @@ -1250,70 +1281,25 @@ private static List<String> staleRootBlobs(RepositoryData repositoryData, Set<St
}).toList();
}

private List<String> cleanupStaleRootFiles(
long previousGeneration,
Collection<SnapshotId> deletedSnapshots,
List<String> blobsToDelete
) {
if (blobsToDelete.isEmpty()) {
return blobsToDelete;
}
try {
if (logger.isInfoEnabled()) {
// If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata
// blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot
// delete would also log a confusing INFO message about "stale blobs".
final Set<String> blobNamesToIgnore = deletedSnapshots.stream()
.flatMap(
snapshotId -> Stream.of(
GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()),
SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()),
INDEX_FILE_PREFIX + previousGeneration
)
private void logStaleRootLevelBlobs(long previousGeneration, Collection<SnapshotId> deletedSnapshots, List<String> blobsToDelete) {
if (logger.isInfoEnabled()) {
// If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata
// blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot
// delete would also log a confusing INFO message about "stale blobs".
final Set<String> blobNamesToIgnore = deletedSnapshots.stream()
.flatMap(
snapshotId -> Stream.of(
GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()),
SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()),
INDEX_FILE_PREFIX + previousGeneration
)
.collect(Collectors.toSet());
final List<String> blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList();
if (blobsToLog.isEmpty() == false) {
logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog);
}
}
deleteFromContainer(blobContainer(), blobsToDelete.iterator());
return blobsToDelete;
} catch (Exception e) {
logger.warn(
() -> format(
"[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them",
metadata.name(),
blobsToDelete
),
e
);
}
return Collections.emptyList();
}

private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds) {
DeleteResult deleteResult = DeleteResult.ZERO;
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
final String indexSnId = indexEntry.getKey();
try {
if (survivingIndexIds.contains(indexSnId) == false) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = deleteResult.add(indexEntry.getValue().delete(OperationPurpose.SNAPSHOT));
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
}
} catch (Exception e) {
logger.warn(
() -> format(
"[%s] index %s is no longer part of any snapshot in the repository, " + "but failed to clean up its index folder",
metadata.name(),
indexSnId
),
e
);
)
.collect(Collectors.toSet());
final List<String> blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList();
if (blobsToLog.isEmpty() == false) {
logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog);
}
}
return deleteResult;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ public long getFailureCount() {

private volatile boolean blocked = false;

private volatile boolean failOnDelete = false;

public MockRepository(
RepositoryMetadata metadata,
Environment environment,
Expand Down Expand Up @@ -352,6 +354,13 @@ public void setBlockOnceOnReadSnapshotInfoIfAlreadyBlocked() {
blockOnceOnReadSnapshotInfo.set(true);
}

/**
* Sets the fail-on-delete flag, which if {@code true} throws an exception when deleting a blob.
*/
public void setFailOnDelete(boolean failOnDelete) {
this.failOnDelete = failOnDelete;
}

public boolean blocked() {
return blocked;
}
Expand Down Expand Up @@ -550,6 +559,9 @@ public InputStream readBlob(OperationPurpose purpose, String name, long position

@Override
public DeleteResult delete(OperationPurpose purpose) throws IOException {
if (failOnDelete) {
throw new IOException("simulated delete failure");
}
DeleteResult deleteResult = DeleteResult.ZERO;
for (BlobContainer child : children(purpose).values()) {
deleteResult = deleteResult.add(child.delete(purpose));
Expand Down

0 comments on commit 67867ce

Please sign in to comment.