Skip to content

Commit

Permalink
Enable Parallel Deletes in Azure Repository (elastic#42783) (elastic#…
Browse files Browse the repository at this point in the history
…43886)

* Parallel deletes via private thread pool
  • Loading branch information
original-brownbear authored Jul 3, 2019
1 parent 365dfe8 commit 826f38c
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ testClusters.integTest {
// in a hacky way to change the protocol and endpoint. We must fix that.
setting 'azure.client.integration_test.endpoint_suffix',
{ "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://${azureStorageFixture.addressAndPort }" }
String firstPartOfSeed = project.rootProject.testSeed.tokenize(':').get(0)
setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString()
} else {
println "Using an external service to test the repository-azure plugin"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,37 @@
import com.microsoft.azure.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

public class AzureBlobContainer extends AbstractBlobContainer {

private final Logger logger = LogManager.getLogger(AzureBlobContainer.class);
private final AzureBlobStore blobStore;

private final ThreadPool threadPool;
private final String keyPath;

public AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
AzureBlobContainer(BlobPath path, AzureBlobStore blobStore, ThreadPool threadPool) {
super(path);
this.blobStore = blobStore;
this.keyPath = path.buildAsString();
this.threadPool = threadPool;
}

@Override
Expand Down Expand Up @@ -117,6 +125,32 @@ public void deleteBlob(String blobName) throws IOException {
}
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
if (blobNames.isEmpty()) {
return;
}
final PlainActionFuture<Collection<Void>> result = PlainActionFuture.newFuture();
final GroupedActionListener<Void> listener = new GroupedActionListener<>(result, blobNames.size());
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint.
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
for (String blobName : blobNames) {
executor.submit(new ActionRunnable<Void>(listener) {
@Override
protected void doRun() throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
listener.onResponse(null);
}
});
}
try {
result.actionGet();
} catch (Exception e) {
throw new IOException("Exception during bulk delete", e);
}
}

@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String prefix) throws IOException {
logger.trace("listBlobsByPrefix({})", prefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -40,15 +41,17 @@
public class AzureBlobStore implements BlobStore {

private final AzureStorageService service;
private final ThreadPool threadPool;

private final String clientName;
private final String container;
private final LocationMode locationMode;

public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service) {
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service, ThreadPool threadPool) {
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
this.service = service;
this.threadPool = threadPool;
// locationMode is set per repository, not per client
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
Expand All @@ -70,7 +73,7 @@ public LocationMode getLocationMode() {

@Override
public BlobContainer blobContainer(BlobPath path) {
return new AzureBlobContainer(path, this);
return new AzureBlobContainer(path, this, threadPool);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,13 @@ public static final class Repository {

private final BlobPath basePath;
private final ByteSizeValue chunkSize;
private final Environment environment;
private final AzureStorageService storageService;
private final boolean readonly;

public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
AzureStorageService storageService, ThreadPool threadPool) {
super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool);
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.environment = environment;
this.storageService = storageService;

final String basePath = Strings.trimLeadingCharacter(Repository.BASE_PATH_SETTING.get(metadata.settings()), '/');
Expand Down Expand Up @@ -115,7 +113,7 @@ protected BlobStore getBlobStore() {

@Override
protected AzureBlobStore createBlobStore() {
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, threadPool);

logger.debug(() -> new ParameterizedMessage(
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
Expand All @@ -40,6 +43,8 @@
*/
public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {

public static final String REPOSITORY_THREAD_POOL_NAME = "repository_azure";

// protected for testing
final AzureStorageService azureStoreService;

Expand Down Expand Up @@ -70,6 +75,15 @@ public List<Setting<?>> getSettings() {
);
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return Collections.singletonList(executorBuilder());
}

public static ExecutorBuilder<?> executorBuilder() {
return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 32, TimeValue.timeValueSeconds(30L));
}

@Override
public void reload(Settings settings) {
// secure settings should be readable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,31 @@
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;

public class AzureBlobStoreContainerTests extends ESBlobStoreContainerTestCase {

private ThreadPool threadPool;

@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("AzureBlobStoreTests", AzureRepositoryPlugin.executorBuilder());
}

@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
}

@Override
protected BlobStore newBlobStore() {
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
AzureStorageServiceMock client = new AzureStorageServiceMock();
return new AzureBlobStore(repositoryMetaData, client);
return new AzureBlobStore(repositoryMetaData, client, threadPool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,31 @@
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.ESBlobStoreTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;

public class AzureBlobStoreTests extends ESBlobStoreTestCase {

private ThreadPool threadPool;

@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("AzureBlobStoreTests", AzureRepositoryPlugin.executorBuilder());
}

@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
}

@Override
protected BlobStore newBlobStore() {
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
AzureStorageServiceMock client = new AzureStorageServiceMock();
return new AzureBlobStore(repositoryMetaData, client);
return new AzureBlobStore(repositoryMetaData, client, threadPool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

protected final NamedXContentRegistry namedXContentRegistry;

private final ThreadPool threadPool;
protected final ThreadPool threadPool;

private static final int BUFFER_SIZE = 4096;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@

public class TestThreadPool extends ThreadPool {

public TestThreadPool(String name) {
this(name, Settings.EMPTY);
public TestThreadPool(String name, ExecutorBuilder<?>... customBuilders) {
this(name, Settings.EMPTY, customBuilders);
}

public TestThreadPool(String name, Settings settings) {
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build());
public TestThreadPool(String name, Settings settings, ExecutorBuilder<?>... customBuilders) {
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build(), customBuilders);
}

}

0 comments on commit 826f38c

Please sign in to comment.