Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize stale index deletion #100316

Merged
Merged
6 changes: 6 additions & 0 deletions docs/changelog/100316.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 100316
summary: Parallelize stale index deletion
area: Snapshot/Restore
type: enhancement
issues:
- 61513
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryConflictException;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;

import java.nio.file.Path;
Expand Down Expand Up @@ -295,4 +297,58 @@ public void testRepositoryConflict() throws Exception {
logger.info("--> wait until snapshot deletion is finished");
assertAcked(future.actionGet());
}

public void testLeakedStaleIndicesAreDeletedBySubsequentDelete() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a cool test and I learnt a few tips and tricks from it. But it does not test the new parallelization change. Do we care?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is largely from the original contributor, but I think it's a reasonable test to write. I've played around with a few ideas for testing the new threading more precisely but it seems pretty tricky, and kinda doesn't matter so much as long as we do actually do the work somehow. I think I had an idea for a test tho.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok my idea seems to work, see testCleanupStaleBlobsConcurrency added in b56455c (sorry for the force-push)

Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String snapshot1Name = "test-snap-1";
final String snapshot2Name = "test-snap-2";

logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
ywangd marked this conversation as resolved.
Show resolved Hide resolved
createRepository(repositoryName, "mock", repositoryPath);

logger.info("--> creating index-1 and ingest data");
createIndex("test-idx-1");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-1", Integer.toString(10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, snapshot1Name);

logger.info("--> creating index-2 and ingest data");
createIndex("test-idx-2");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-2", Integer.toString(10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, snapshot2Name);

// Make repository throw exceptions when trying to delete stale indices
// This will make sure stale indices stay in repository after snapshot delete
final var repository = (MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class)
.repository(repositoryName);
repository.setFailOnDeleteContainer(true);

logger.info("--> delete the second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get();

// Make repository work normally
repository.setFailOnDeleteContainer(false);

// This snapshot should delete last snapshot's residual stale indices as well
logger.info("--> delete snapshot one");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get();

logger.info("--> check no leftover files");
assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs

logger.info("--> done");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ public DeleteResult add(DeleteResult other) {
public DeleteResult add(long blobs, long bytes) {
return new DeleteResult(blobsDeleted + blobs, bytesDeleted + bytes);
}

public static DeleteResult of(long blobs, long bytes) {
if (blobs == 0 && bytes == 0) {
return ZERO;
} else {
return new DeleteResult(blobs, bytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -155,4 +156,30 @@ int runningTasks() {
return runningTasks.get();
}

/**
* Eagerly pull tasks from the queue and execute them on this thread. This must only be used if the tasks in the queue are all
* synchronous, i.e. they release their ref before returning from {@code onResponse()}.
*/
public void runSyncTasksEagerly() {
final AtomicBoolean isDone = new AtomicBoolean(true);
final Releasable ref = () -> isDone.set(true);
ActionListener<Releasable> task;
while ((task = tasks.poll()) != null) {
isDone.set(false);
try {
logger.trace("[{}] eagerly running task {}", taskRunnerName, task);
task.onResponse(ref);
} catch (Exception e) {
logger.error(Strings.format("[%s] task %s failed", taskRunnerName, task), e);
assert false : e;
task.onFailure(e);
return;
}
if (isDone.get() == false) {
logger.error("runSyncTasksEagerly() was called on a queue [{}] containing an async task: [{}]", taskRunnerName, task);
assert false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a best effort, right? A mis-behaving task can still fork but release the reference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes indeed but that doesn't matter to us really. The ref is just so that the AbstractThrottledTaskRunner can track the relevant activities to completion. If we did fork an untracked task then presumably we're tracking it somewhere else.

return;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.core.CheckedConsumer;
Expand Down Expand Up @@ -390,6 +392,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final ShardSnapshotTaskRunner shardSnapshotTaskRunner;

private final ThrottledTaskRunner staleBlobDeleteRunner;

/**
* Constructs new BlobStoreRepository
* @param metadata The metadata for this repository including name and settings
Expand Down Expand Up @@ -430,6 +434,11 @@ protected BlobStoreRepository(
this::doSnapshotShard,
this::snapshotFile
);
staleBlobDeleteRunner = new ThrottledTaskRunner(
"cleanupStaleBlobs",
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
threadPool.executor(ThreadPool.Names.SNAPSHOT)
);
}

@Override
Expand Down Expand Up @@ -1138,31 +1147,84 @@ private void cleanupStaleBlobs(
RepositoryData newRepoData,
ActionListener<DeleteResult> listener
) {
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(2, ActionListener.wrap(deleteResults -> {
DeleteResult deleteResult = DeleteResult.ZERO;
for (DeleteResult result : deleteResults) {
deleteResult = deleteResult.add(result);
final var blobsDeleted = new AtomicLong();
final var bytesDeleted = new AtomicLong();
try (var listeners = new RefCountingListener(listener.map(ignored -> DeleteResult.of(blobsDeleted.get(), bytesDeleted.get())))) {

final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
if (staleRootBlobs.isEmpty() == false) {
staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> {
try (ref) {
logStaleRootLevelBlobs(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs);
deleteFromContainer(blobContainer(), staleRootBlobs.iterator());
for (final var staleRootBlob : staleRootBlobs) {
bytesDeleted.addAndGet(rootBlobs.get(staleRootBlob).length());
}
blobsDeleted.addAndGet(staleRootBlobs.size());
} catch (Exception e) {
logger.warn(
() -> format(
"[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them",
metadata.name(),
staleRootBlobs
),
e
);
}
}));
}
listener.onResponse(deleteResult);
}, listener::onFailure));

final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
if (staleRootBlobs.isEmpty()) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> {
List<String> deletedBlobs = cleanupStaleRootFiles(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs);
return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum());
}));
final var survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
for (final var indexEntry : foundIndices.entrySet()) {
final var indexSnId = indexEntry.getKey();
if (survivingIndexIds.contains(indexSnId)) {
continue;
}
staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> {
try (ref) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
final var deleteResult = indexEntry.getValue().delete(OperationPurpose.SNAPSHOT);
blobsDeleted.addAndGet(deleteResult.blobsDeleted());
bytesDeleted.addAndGet(deleteResult.bytesDeleted());
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
} catch (IOException e) {
logger.warn(() -> format("""
[%s] index %s is no longer part of any snapshot in the repository, \
but failed to clean up its index folder""", metadata.name(), indexSnId), e);
}
}));
Comment on lines +1188 to +1206
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to keep this and above in their own separate methods as how they are now? Fewer nesting levels could be helpful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I inlined these because we ended up having to pass really quite a lot of parameters in, and it wasn't really even a coherent set of parameters so much as just "the things needed to run this method". It's still less than one screenful (on my screen anyway) and nicely exposes the execution pattern, so tbh I prefer it as it is now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually kind of a code smell throughout this class (and the snapshot codebase more generally). I have been working on a refactoring that should help simplify things in this area and will open a followup in the next few days.

}
}

final Set<String> survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
if (foundIndices.keySet().equals(survivingIndexIds)) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
}
// If we did the cleanup of stale indices purely using a throttled executor then there would be no backpressure to prevent us from
// falling arbitrarily far behind. But nor do we want to dedicate all the SNAPSHOT threads to stale index cleanups because that
// would slow down other snapshot operations in situations that do not need backpressure.
//
// The solution is to dedicate one SNAPSHOT thread to doing the cleanups eagerly, alongside the throttled executor which spreads
// the rest of the work across the other threads if they are free. If the eager cleanup loop doesn't finish before the next one
// starts then we dedicate another SNAPSHOT thread to the deletions, and so on, until eventually either we catch up or the SNAPSHOT
// pool is fully occupied with blob deletions, which pushes back on other snapshot operations.

threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
@Override
protected void doRun() {
staleBlobDeleteRunner.runSyncTasksEagerly();
}

@Override
public void onFailure(Exception e) {
logger.error("unexpected failure while processing deletes on dedicated snapshot thread", e);
assert false : e;
}

@Override
public void onRejection(Exception e) {
if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) {
return;
}
super.onRejection(e);
}
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have this managed inside the task runner? It would be helpful for reuse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's a good point. With only one caller it's hard to know where to put the abstraction boundary, but I can see value in allowing callers to pass in an Executor. They can always use EsExecutors.DIRECT_EXECUTOR_SERVICE if they really don't want to fork.

}

/**
Expand All @@ -1171,8 +1233,8 @@ private void cleanupStaleBlobs(
* TODO: Add shard level cleanups
* TODO: Add unreferenced index metadata cleanup
* <ul>
* <li>Deleting stale indices {@link #cleanupStaleIndices}</li>
* <li>Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}</li>
* <li>Deleting stale indices</li>
* <li>Deleting unreferenced root level blobs</li>
* </ul>
* @param repositoryStateId Current repository state id
* @param repositoryMetaVersion version of the updated repository metadata to write
Expand Down Expand Up @@ -1250,70 +1312,25 @@ private static List<String> staleRootBlobs(RepositoryData repositoryData, Set<St
}).toList();
}

private List<String> cleanupStaleRootFiles(
long previousGeneration,
Collection<SnapshotId> deletedSnapshots,
List<String> blobsToDelete
) {
if (blobsToDelete.isEmpty()) {
return blobsToDelete;
}
try {
if (logger.isInfoEnabled()) {
// If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata
// blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot
// delete would also log a confusing INFO message about "stale blobs".
final Set<String> blobNamesToIgnore = deletedSnapshots.stream()
.flatMap(
snapshotId -> Stream.of(
GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()),
SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()),
INDEX_FILE_PREFIX + previousGeneration
)
private void logStaleRootLevelBlobs(long previousGeneration, Collection<SnapshotId> deletedSnapshots, List<String> blobsToDelete) {
if (logger.isInfoEnabled()) {
// If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata
// blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot
// delete would also log a confusing INFO message about "stale blobs".
final Set<String> blobNamesToIgnore = deletedSnapshots.stream()
.flatMap(
snapshotId -> Stream.of(
GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()),
SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()),
INDEX_FILE_PREFIX + previousGeneration
)
.collect(Collectors.toSet());
final List<String> blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList();
if (blobsToLog.isEmpty() == false) {
logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog);
}
}
deleteFromContainer(blobContainer(), blobsToDelete.iterator());
return blobsToDelete;
} catch (Exception e) {
logger.warn(
() -> format(
"[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them",
metadata.name(),
blobsToDelete
),
e
);
}
return Collections.emptyList();
}

private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds) {
DeleteResult deleteResult = DeleteResult.ZERO;
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
final String indexSnId = indexEntry.getKey();
try {
if (survivingIndexIds.contains(indexSnId) == false) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = deleteResult.add(indexEntry.getValue().delete(OperationPurpose.SNAPSHOT));
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
}
} catch (Exception e) {
logger.warn(
() -> format(
"[%s] index %s is no longer part of any snapshot in the repository, " + "but failed to clean up its index folder",
metadata.name(),
indexSnId
),
e
);
)
.collect(Collectors.toSet());
final List<String> blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList();
if (blobsToLog.isEmpty() == false) {
logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog);
}
}
return deleteResult;
}

@Override
Expand Down
Loading