Skip to content

Commit

Permalink
Speedup snapshot stale indices delete
Browse files Browse the repository at this point in the history
  • Loading branch information
piyushdaftary committed Nov 2, 2020
1 parent 99dac25 commit 2dfbb9c
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
Expand All @@ -30,8 +31,10 @@
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;

import java.nio.file.Path;
Expand All @@ -42,6 +45,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.greaterThan;

@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
Expand Down Expand Up @@ -225,4 +229,106 @@ public void testRepositoryVerification() {
assertThat(ExceptionsHelper.stackTrace(ex), containsString("is not shared"));
}
}

public void testSnapshotDeleteWithRepositoryOutage() throws Exception {
disableRepoConsistencyCheck("This test expects residual files in repository");
Client client = client();
Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

logger.info("--> creating index-1 and ingest data");
createIndex("test-idx-1");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-1", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
}
refresh();

// Make repository to throw exception when trying to delete stale indices
String masterNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo"))
.setThrowExceptionWhileDelete(true);

logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
.setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

logger.info("--> delete the snapshot");
client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-1").get();

logger.info("--> done");
}

public void testResidualStaleIndicesAreDeletedByConsecutiveDelete() throws Exception {
Client client = client();
Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

int numberOfFiles = numberOfFiles(repo);

logger.info("--> creating index-1 and ingest data");
createIndex("test-idx-1");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-1", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating first snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
.setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

logger.info("--> creating index-2 and ingest data");
createIndex("test-idx-2");
ensureGreen();
for (int j = 0; j < 10; j++) {
indexDoc("test-idx-2", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating second snapshot");
CreateSnapshotResponse createSnapshotResponse1 = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2")
.setWaitForCompletion(true).get();
assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse1.getSnapshotInfo().totalShards()));

// Make repository to throw exception when trying to delete stale indices
// This will make sure stale indices stays in repository after snapshot delete
String masterNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
setThrowExceptionWhileDelete(true);

logger.info("--> delete the second snapshot");
client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-2").get();

// Make repository to work normally
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
setThrowExceptionWhileDelete(false);

// This snapshot should delete last snapshot's residual stale indices as well
logger.info("--> delete snapshot one");
client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-1").get();

logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
assertFileCount(repo, numberOfFiles + 2);

logger.info("--> done");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ private void cleanupStaleBlobs(Collection<SnapshotId> deletedSnapshots, Map<Stri
if (foundIndices.keySet().equals(survivingIndexIds)) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
cleanupStaleIndices(foundIndices, survivingIndexIds, executor, groupedListener);
}
}

Expand Down Expand Up @@ -1027,22 +1027,31 @@ private List<String> cleanupStaleRootFiles(long previousGeneration, Collection<S
return Collections.emptyList();
}

private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds) {
DeleteResult deleteResult = DeleteResult.ZERO;
private void cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds,
Executor executor, GroupedActionListener<DeleteResult> listener) {
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> {
DeleteResult deleteResult = DeleteResult.ZERO;
for (DeleteResult result : deleteResults) {
deleteResult = deleteResult.add(result);
}
listener.onResponse(deleteResult);
}, listener::onFailure), foundIndices.size() - survivingIndexIds.size());

try {
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
final String indexSnId = indexEntry.getKey();
try {
if (survivingIndexIds.contains(indexSnId) == false) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = deleteResult.add(indexEntry.getValue().delete());
executor.execute(ActionRunnable.supply(groupedListener, () -> {
try {
DeleteResult staleIndexDeleteResult = indexEntry.getValue().delete();
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
return staleIndexDeleteResult;
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
return DeleteResult.ZERO;
}
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
}
}));
}
} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
Expand All @@ -1051,7 +1060,6 @@ private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e);
}
return deleteResult;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public long getFailureCount() {
private volatile boolean throwReadErrorAfterUnblock = false;

private volatile boolean blocked = false;
private volatile boolean setThrowExceptionWhileDelete;

public MockRepository(RepositoryMetadata metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService, BigArrays bigArrays,
Expand Down Expand Up @@ -246,6 +247,10 @@ public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) {
this.failReadsAfterUnblock = failReadsAfterUnblock;
}

public void setThrowExceptionWhileDelete(boolean throwError) {
setThrowExceptionWhileDelete = throwError;
}

public boolean blocked() {
return blocked;
}
Expand Down Expand Up @@ -414,6 +419,9 @@ public InputStream readBlob(String name, long position, long length) throws IOEx
@Override
public DeleteResult delete() throws IOException {
DeleteResult deleteResult = DeleteResult.ZERO;
if (setThrowExceptionWhileDelete) {
throw new IOException("Random exception");
}
for (BlobContainer child : children().values()) {
deleteResult = deleteResult.add(child.delete());
}
Expand Down

0 comments on commit 2dfbb9c

Please sign in to comment.