Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedup snapshot stale indices delete #64513

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
Expand All @@ -30,8 +31,10 @@
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;

import java.nio.file.Path;
Expand All @@ -42,6 +45,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.greaterThan;

@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
Expand Down Expand Up @@ -225,4 +229,106 @@ public void testRepositoryVerification() {
assertThat(ExceptionsHelper.stackTrace(ex), containsString("is not shared"));
}
}

public void testSnapshotDeleteWithRepositoryOutage() throws Exception {
piyushdaftary marked this conversation as resolved.
Show resolved Hide resolved
disableRepoConsistencyCheck("This test expects residual files in repository");
Client client = client();
Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

logger.info("--> creating index-1 and ingest data");
createIndex("test-idx-1");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-1", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
}
refresh();

// Make repository to throw exception when trying to delete stale indices
String masterNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo"))
.setThrowExceptionWhileDelete(true);

logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
piyushdaftary marked this conversation as resolved.
Show resolved Hide resolved
.setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

logger.info("--> delete the snapshot");
client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-1").get();

logger.info("--> done");
}

public void testResidualStaleIndicesAreDeletedByConsecutiveDelete() throws Exception {
Client client = client();
Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
piyushdaftary marked this conversation as resolved.
Show resolved Hide resolved
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

int numberOfFiles = numberOfFiles(repo);

logger.info("--> creating index-1 and ingest data");
createIndex("test-idx-1");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-1", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating first snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
.setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

logger.info("--> creating index-2 and ingest data");
createIndex("test-idx-2");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-2", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating second snapshot");
CreateSnapshotResponse createSnapshotResponse1 = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2")
.setWaitForCompletion(true).get();
assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse1.getSnapshotInfo().totalShards()));

// Make repository to throw exception when trying to delete stale indices
// This will make sure stale indices stays in repository after snapshot delete
String masterNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
setThrowExceptionWhileDelete(true);

logger.info("--> delete the second snapshot");
client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-2").get();

// Make repository to work normally
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
setThrowExceptionWhileDelete(false);

// This snapshot should delete last snapshot's residual stale indices as well
logger.info("--> delete snapshot one");
client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-1").get();

logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
assertFileCount(repo, numberOfFiles + 2);

logger.info("--> done");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ private void cleanupStaleBlobs(Collection<SnapshotId> deletedSnapshots, Map<Stri
if (foundIndices.keySet().equals(survivingIndexIds)) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
cleanupStaleIndices(foundIndices, survivingIndexIds, executor, groupedListener);
}
}

Expand Down Expand Up @@ -1027,22 +1027,27 @@ private List<String> cleanupStaleRootFiles(long previousGeneration, Collection<S
return Collections.emptyList();
}

private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds) {
DeleteResult deleteResult = DeleteResult.ZERO;
private void cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds,
Executor executor, GroupedActionListener<DeleteResult> listener) {
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> {
DeleteResult deleteResult = DeleteResult.ZERO;
for (DeleteResult result : deleteResults) {
deleteResult = deleteResult.add(result);
}
listener.onResponse(deleteResult);
}, listener::onFailure), foundIndices.size() - survivingIndexIds.size());

try {
final BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete = new LinkedBlockingQueue<>();
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
final String indexSnId = indexEntry.getKey();
try {
if (survivingIndexIds.contains(indexSnId) == false) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = deleteResult.add(indexEntry.getValue().delete());
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
}
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
}
staleIndicesToDelete.put(indexEntry);
piyushdaftary marked this conversation as resolved.
Show resolved Hide resolved
}

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
foundIndices.size() - survivingIndexIds.size());
for (int i = 0; i < workers; ++i) {
executeOneStaeIndexDelete( staleIndicesToDelete, executor, groupedListener);
piyushdaftary marked this conversation as resolved.
Show resolved Hide resolved
}
} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
Expand All @@ -1051,7 +1056,29 @@ private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e);
}
return deleteResult;
}

private void executeOneStaeIndexDelete(BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete, Executor executor,
GroupedActionListener<DeleteResult> listener) throws InterruptedException {
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
if (indexEntry == null) {
listener.onResponse(DeleteResult.ZERO);
} else {
final String indexSnId = indexEntry.getKey();
executor.execute(ActionRunnable.supply(listener, () -> {
piyushdaftary marked this conversation as resolved.
Show resolved Hide resolved
try {
DeleteResult staleIndexDeleteResult = indexEntry.getValue().delete();
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
executeOneStaeIndexDelete( staleIndicesToDelete, executor, listener);
return staleIndexDeleteResult;
} catch (IOException e) {
piyushdaftary marked this conversation as resolved.
Show resolved Hide resolved
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
return DeleteResult.ZERO;
}
}));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public long getFailureCount() {
private volatile boolean throwReadErrorAfterUnblock = false;

private volatile boolean blocked = false;
private volatile boolean setThrowExceptionWhileDelete;

public MockRepository(RepositoryMetadata metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService, BigArrays bigArrays,
Expand Down Expand Up @@ -246,6 +247,10 @@ public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) {
this.failReadsAfterUnblock = failReadsAfterUnblock;
}

public void setThrowExceptionWhileDelete(boolean throwError) {
setThrowExceptionWhileDelete = throwError;
}

public boolean blocked() {
return blocked;
}
Expand Down Expand Up @@ -414,6 +419,9 @@ public InputStream readBlob(String name, long position, long length) throws IOEx
@Override
public DeleteResult delete() throws IOException {
DeleteResult deleteResult = DeleteResult.ZERO;
if (setThrowExceptionWhileDelete) {
throw new IOException("Random exception");
}
for (BlobContainer child : children().values()) {
deleteResult = deleteResult.add(child.delete());
}
Expand Down