Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Parallel Deletes in Azure Repository (#42783) #43886

Merged
merged 1 commit into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ testClusters.integTest {
// in a hacky way to change the protocol and endpoint. We must fix that.
setting 'azure.client.integration_test.endpoint_suffix',
{ "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://${azureStorageFixture.addressAndPort }" }
String firstPartOfSeed = project.rootProject.testSeed.tokenize(':').get(0)
setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString()
} else {
println "Using an external service to test the repository-azure plugin"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,37 @@
import com.microsoft.azure.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

public class AzureBlobContainer extends AbstractBlobContainer {

private final Logger logger = LogManager.getLogger(AzureBlobContainer.class);
private final AzureBlobStore blobStore;

private final ThreadPool threadPool;
private final String keyPath;

public AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
AzureBlobContainer(BlobPath path, AzureBlobStore blobStore, ThreadPool threadPool) {
super(path);
this.blobStore = blobStore;
this.keyPath = path.buildAsString();
this.threadPool = threadPool;
}

@Override
Expand Down Expand Up @@ -117,6 +125,32 @@ public void deleteBlob(String blobName) throws IOException {
}
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
if (blobNames.isEmpty()) {
return;
}
final PlainActionFuture<Collection<Void>> result = PlainActionFuture.newFuture();
final GroupedActionListener<Void> listener = new GroupedActionListener<>(result, blobNames.size());
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint.
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
for (String blobName : blobNames) {
executor.submit(new ActionRunnable<Void>(listener) {
@Override
protected void doRun() throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
listener.onResponse(null);
}
});
}
try {
result.actionGet();
} catch (Exception e) {
throw new IOException("Exception during bulk delete", e);
}
}

@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String prefix) throws IOException {
logger.trace("listBlobsByPrefix({})", prefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -40,15 +41,17 @@
public class AzureBlobStore implements BlobStore {

private final AzureStorageService service;
private final ThreadPool threadPool;

private final String clientName;
private final String container;
private final LocationMode locationMode;

public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service) {
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service, ThreadPool threadPool) {
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
this.service = service;
this.threadPool = threadPool;
// locationMode is set per repository, not per client
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
Expand All @@ -70,7 +73,7 @@ public LocationMode getLocationMode() {

@Override
public BlobContainer blobContainer(BlobPath path) {
return new AzureBlobContainer(path, this);
return new AzureBlobContainer(path, this, threadPool);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,13 @@ public static final class Repository {

private final BlobPath basePath;
private final ByteSizeValue chunkSize;
private final Environment environment;
private final AzureStorageService storageService;
private final boolean readonly;

public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
AzureStorageService storageService, ThreadPool threadPool) {
super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool);
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.environment = environment;
this.storageService = storageService;

final String basePath = Strings.trimLeadingCharacter(Repository.BASE_PATH_SETTING.get(metadata.settings()), '/');
Expand Down Expand Up @@ -115,7 +113,7 @@ protected BlobStore getBlobStore() {

@Override
protected AzureBlobStore createBlobStore() {
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, threadPool);

logger.debug(() -> new ParameterizedMessage(
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
Expand All @@ -40,6 +43,8 @@
*/
public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {

public static final String REPOSITORY_THREAD_POOL_NAME = "repository_azure";

// protected for testing
final AzureStorageService azureStoreService;

Expand Down Expand Up @@ -70,6 +75,15 @@ public List<Setting<?>> getSettings() {
);
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return Collections.singletonList(executorBuilder());
}

public static ExecutorBuilder<?> executorBuilder() {
return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 32, TimeValue.timeValueSeconds(30L));
}

@Override
public void reload(Settings settings) {
// secure settings should be readable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,31 @@
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;

public class AzureBlobStoreContainerTests extends ESBlobStoreContainerTestCase {

private ThreadPool threadPool;

@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("AzureBlobStoreTests", AzureRepositoryPlugin.executorBuilder());
}

@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
}

@Override
protected BlobStore newBlobStore() {
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
AzureStorageServiceMock client = new AzureStorageServiceMock();
return new AzureBlobStore(repositoryMetaData, client);
return new AzureBlobStore(repositoryMetaData, client, threadPool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,31 @@
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.ESBlobStoreTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;

public class AzureBlobStoreTests extends ESBlobStoreTestCase {

private ThreadPool threadPool;

@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("AzureBlobStoreTests", AzureRepositoryPlugin.executorBuilder());
}

@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
}

@Override
protected BlobStore newBlobStore() {
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
AzureStorageServiceMock client = new AzureStorageServiceMock();
return new AzureBlobStore(repositoryMetaData, client);
return new AzureBlobStore(repositoryMetaData, client, threadPool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

protected final NamedXContentRegistry namedXContentRegistry;

private final ThreadPool threadPool;
protected final ThreadPool threadPool;

private static final int BUFFER_SIZE = 4096;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@

public class TestThreadPool extends ThreadPool {

public TestThreadPool(String name) {
this(name, Settings.EMPTY);
public TestThreadPool(String name, ExecutorBuilder<?>... customBuilders) {
this(name, Settings.EMPTY, customBuilders);
}

public TestThreadPool(String name, Settings settings) {
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build());
public TestThreadPool(String name, Settings settings, ExecutorBuilder<?>... customBuilders) {
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build(), customBuilders);
}

}