From 0d343a8429ae3d98cb4b2a9b387a096d04d5eaaf Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 5 Jun 2019 17:03:14 +0200 Subject: [PATCH] Add Ability to List Child Containers to BlobContainer (#42653) * Add Ability to List Child Containers to BlobContainer * This is a prerequisite of #42189 --- .../blobstore/url/URLBlobContainer.java | 6 + .../azure/AzureBlobContainer.java | 11 ++ .../repositories/azure/AzureBlobStore.java | 8 ++ .../azure/AzureStorageService.java | 39 +++++- .../gcs/GoogleCloudStorageBlobContainer.java | 6 + .../gcs/GoogleCloudStorageBlobStore.java | 17 +++ .../repositories/hdfs/HdfsBlobContainer.java | 24 +++- .../hdfs/HdfsRepositoryTests.java | 61 +++++++++ plugins/repository-s3/build.gradle | 2 +- .../repositories/s3/S3BlobContainer.java | 57 ++++++++- .../repositories/s3/S3Repository.java | 6 - .../s3/S3RepositoryThirdPartyTests.java | 20 ++- .../common/blobstore/BlobContainer.java | 10 ++ .../common/blobstore/fs/FsBlobContainer.java | 16 ++- .../blobstore/BlobStoreRepository.java | 9 +- .../mockstore/BlobContainerWrapper.java | 5 + .../snapshots/mockstore/MockRepository.java | 10 ++ .../AbstractThirdPartyRepositoryTestCase.java | 118 +++++++++++++++++- 18 files changed, 401 insertions(+), 24 deletions(-) create mode 100644 plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsRepositoryTests.java diff --git a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index 91676037043e3..697b443c93abf 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.blobstore.url; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; @@ -74,6 +75,11 @@ public Map listBlobs() throws IOException { throw new UnsupportedOperationException("URL repository doesn't support this operation"); } + @Override + public Map children() throws IOException { + throw new UnsupportedOperationException("URL repository doesn't support this operation"); + } + /** * This operation is not supported by URLBlobContainer */ diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 9cdcff56649c6..513bcf50abc44 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; @@ -169,6 +170,16 @@ public Map listBlobs() throws IOException { return listBlobsByPrefix(null); } + @Override + public Map children() throws IOException { + final BlobPath path = path(); + try { + return blobStore.children(path); + } catch (URISyntaxException | StorageException e) { + throw new IOException("Failed to list children in path [" + path.buildAsString() + "].", e); + } + } + protected String buildKey(String blobName) { return keyPath + (blobName == null ? "" : blobName); } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 70e6f40e6f7e6..d2d4ee66d1a58 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -34,7 +34,10 @@ import java.io.InputStream; import java.net.URISyntaxException; import java.nio.file.FileAlreadyExistsException; +import java.util.Collections; import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -97,6 +100,11 @@ public Map listBlobsByPrefix(String keyPath, String prefix return service.listBlobsByPrefix(clientName, container, keyPath, prefix); } + public Map children(BlobPath path) throws URISyntaxException, StorageException { + return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect( + Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this)))); + } + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws URISyntaxException, StorageException, FileAlreadyExistsException { service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index ee00551849d51..f153aa3031c38 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -29,8 +29,10 @@ import com.microsoft.azure.storage.blob.BlobInputStream; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; +import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; import com.microsoft.azure.storage.blob.CloudBlockBlob; import com.microsoft.azure.storage.blob.DeleteSnapshotsOption; import com.microsoft.azure.storage.blob.ListBlobItem; @@ -39,6 +41,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.Tuple; @@ -54,8 +57,11 @@ import java.net.URISyntaxException; import java.nio.file.FileAlreadyExistsException; import java.security.InvalidKeyException; +import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.function.Supplier; import static java.util.Collections.emptyMap; @@ -209,15 +215,40 @@ public Map listBlobsByPrefix(String account, String contai // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/ // this requires 1 + container.length() + 1, with each 1 corresponding to one of the / final String blobPath = uri.getPath().substring(1 + container.length() + 1); - final BlobProperties properties = ((CloudBlockBlob) blobItem).getProperties(); - final String name = blobPath.substring(keyPath.length()); - logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength())); - blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength())); + if (blobItem instanceof CloudBlob) { + final BlobProperties properties = ((CloudBlob) blobItem).getProperties(); + final String name = blobPath.substring(keyPath.length()); + logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength())); + blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength())); + } } }); return blobsBuilder.immutableMap(); } + public Set children(String account, String container, BlobPath path) throws URISyntaxException, StorageException { + final Set blobsBuilder = new HashSet<>(); + final Tuple> client = client(account); + final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); + final String keyPath = path.buildAsString(); + final EnumSet enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA); + + SocketAccess.doPrivilegedVoidException(() -> { + for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath, false, enumBlobListingDetails, null, client.v2().get())) { + if (blobItem instanceof CloudBlobDirectory) { + final URI uri = blobItem.getUri(); + logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri)); + // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/ + // this requires 1 + container.length() + 1, with each 1 corresponding to one of the /. + // Lastly, we add the length of keyPath to the offset to strip this container's path. + final String uriPath = uri.getPath(); + blobsBuilder.add(uriPath.substring(1 + container.length() + 1 + keyPath.length(), uriPath.length() - 1)); + } + } + }); + return Collections.unmodifiableSet(blobsBuilder); + } + public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws URISyntaxException, StorageException, FileAlreadyExistsException { diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index fb81a5c90039f..a281d83eb4171 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.gcs; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStoreException; @@ -55,6 +56,11 @@ public Map listBlobs() throws IOException { return blobStore.listBlobs(path); } + @Override + public Map children() throws IOException { + return blobStore.listChildren(path()); + } + @Override public Map listBlobsByPrefix(String prefix) throws IOException { return blobStore.listBlobsByPrefix(path, prefix); diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index c90d49bd73d15..9443bfcf6b9e6 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -142,6 +142,23 @@ Map listBlobsByPrefix(String path, String prefix) throws I return mapBuilder.immutableMap(); } + Map listChildren(BlobPath path) throws IOException { + final String pathStr = path.buildAsString(); + final MapBuilder mapBuilder = MapBuilder.newMapBuilder(); + SocketAccess.doPrivilegedVoidIOException + (() -> client().get(bucketName).list(BlobListOption.currentDirectory(), BlobListOption.prefix(pathStr)).iterateAll().forEach( + blob -> { + if (blob.isDirectory()) { + assert blob.getName().startsWith(pathStr); + final String suffixName = blob.getName().substring(pathStr.length()); + if (suffixName.isEmpty() == false) { + mapBuilder.put(suffixName, new GoogleCloudStorageBlobContainer(path.add(suffixName), this)); + } + } + })); + return mapBuilder.immutableMap(); + } + /** * Returns true if the blob exists in the specific bucket * diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index e9b45a9b52e70..fcf303dfc0957 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Path; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.fs.FsBlobContainer; @@ -137,11 +138,13 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS @Override public Map listBlobsByPrefix(@Nullable final String prefix) throws IOException { - FileStatus[] files = store.execute(fileContext -> (fileContext.util().listStatus(path, - path -> prefix == null || path.getName().startsWith(prefix)))); - Map map = new LinkedHashMap(); + FileStatus[] files = store.execute(fileContext -> fileContext.util().listStatus(path, + path -> prefix == null || path.getName().startsWith(prefix))); + Map map = new LinkedHashMap<>(); for (FileStatus file : files) { - map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen())); + if (file.isFile()) { + map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen())); + } } return Collections.unmodifiableMap(map); } @@ -151,6 +154,19 @@ public Map listBlobs() throws IOException { return listBlobsByPrefix(null); } + @Override + public Map children() throws IOException { + FileStatus[] files = store.execute(fileContext -> fileContext.util().listStatus(path)); + Map map = new LinkedHashMap<>(); + for (FileStatus file : files) { + if (file.isDirectory()) { + final String name = file.getPath().getName(); + map.put(name, new HdfsBlobContainer(path().add(name), store, new Path(path, name), bufferSize, securityContext)); + } + } + return Collections.unmodifiableMap(map); + } + /** * Exists to wrap underlying InputStream methods that might make socket connections in * doPrivileged blocks. This is due to the way that hdfs client libraries might open diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsRepositoryTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsRepositoryTests.java new file mode 100644 index 0000000000000..e34f290a8e299 --- /dev/null +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsRepositoryTests.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.hdfs; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.bootstrap.JavaVersion; +import org.elasticsearch.common.settings.MockSecureSettings; +import org.elasticsearch.common.settings.SecureSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase; + +import java.util.Collection; + +import static org.hamcrest.Matchers.equalTo; + +@ThreadLeakFilters(filters = HdfsClientThreadLeakFilter.class) +public class HdfsRepositoryTests extends AbstractThirdPartyRepositoryTestCase { + + @Override + protected Collection> getPlugins() { + return pluginList(HdfsPlugin.class); + } + + @Override + protected SecureSettings credentials() { + return new MockSecureSettings(); + } + + @Override + protected void createRepository(String repoName) { + assumeFalse("https://github.com/elastic/elasticsearch/issues/31498", JavaVersion.current().equals(JavaVersion.parse("11"))); + AcknowledgedResponse putRepositoryResponse = client().admin().cluster().preparePutRepository(repoName) + .setType("hdfs") + .setSettings(Settings.builder() + .put("uri", "hdfs:///") + .put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName()) + .put("path", "foo") + .put("chunk_size", randomIntBetween(100, 1000) + "k") + .put("compress", randomBoolean()) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + } +} diff --git a/plugins/repository-s3/build.gradle b/plugins/repository-s3/build.gradle index e32eefa505583..1126381560c26 100644 --- a/plugins/repository-s3/build.gradle +++ b/plugins/repository-s3/build.gradle @@ -163,7 +163,7 @@ if (useFixture) { def minioAddress = { int minioPort = postProcessFixture.ext."test.fixtures.minio-fixture.tcp.9000" assert minioPort > 0 - return 'http://127.0.0.1:' + minioPort + 'http://127.0.0.1:' + minioPort } File minioAddressFile = new File(project.buildDir, 'generated-resources/s3Fixture.address') diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index c057d330da540..41888b8d7e6b3 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -25,6 +25,7 @@ import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; @@ -38,6 +39,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStoreException; @@ -202,12 +204,15 @@ public Map listBlobsByPrefix(@Nullable String blobNamePref final ObjectListing finalPrevListing = prevListing; list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing)); } else { + final ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); + listObjectsRequest.setBucketName(blobStore.bucket()); + listObjectsRequest.setDelimiter("/"); if (blobNamePrefix != null) { - list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(blobStore.bucket(), - buildKey(blobNamePrefix))); + listObjectsRequest.setPrefix(buildKey(blobNamePrefix)); } else { - list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(blobStore.bucket(), keyPath)); + listObjectsRequest.setPrefix(keyPath); } + list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest)); } for (final S3ObjectSummary summary : list.getObjectSummaries()) { final String name = summary.getKey().substring(keyPath.length()); @@ -230,6 +235,52 @@ public Map listBlobs() throws IOException { return listBlobsByPrefix(null); } + @Override + public Map children() throws IOException { + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + ObjectListing prevListing = null; + final var entries = new ArrayList>(); + while (true) { + ObjectListing list; + if (prevListing != null) { + final ObjectListing finalPrevListing = prevListing; + list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing)); + } else { + final ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); + listObjectsRequest.setBucketName(blobStore.bucket()); + listObjectsRequest.setPrefix(keyPath); + listObjectsRequest.setDelimiter("/"); + list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest)); + } + for (final String summary : list.getCommonPrefixes()) { + final String name = summary.substring(keyPath.length()); + if (name.isEmpty() == false) { + // Stripping the trailing slash off of the common prefix + final String last = name.substring(0, name.length() - 1); + final BlobPath path = path().add(last); + entries.add(entry(last, blobStore.blobContainer(path))); + } + } + assert list.getObjectSummaries().stream().noneMatch(s -> { + for (String commonPrefix : list.getCommonPrefixes()) { + if (s.getKey().substring(keyPath.length()).startsWith(commonPrefix)) { + return true; + } + } + return false; + }) : "Response contained children for listed common prefixes."; + if (list.isTruncated()) { + prevListing = list; + } else { + break; + } + } + return Maps.ofEntries(entries); + } catch (final AmazonClientException e) { + throw new IOException("Exception when listing children of [" + path().buildAsString() + ']', e); + } + } + private String buildKey(String blobName) { return keyPath + blobName; } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index e8d8c6d27ad10..269bb4f22b9ae 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -226,12 +226,6 @@ protected S3BlobStore createBlobStore() { return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, repositoryMetaData); } - // only use for testing - @Override - protected BlobStore blobStore() { - return super.blobStore(); - } - // only use for testing @Override protected BlobStore getBlobStore() { diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java index 88e293575488f..6fd716328f370 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.repositories.s3; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.SecureSettings; import org.elasticsearch.common.settings.Settings; @@ -28,6 +30,8 @@ import java.io.IOException; import java.util.Collection; +import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.blankOrNullString; import static org.hamcrest.Matchers.equalTo; @@ -56,7 +60,7 @@ protected SecureSettings credentials() { protected void createRepository(String repoName) { Settings.Builder settings = Settings.builder() .put("bucket", System.getProperty("test.s3.bucket")) - .put("base_path", System.getProperty("test.s3.base", "/")); + .put("base_path", System.getProperty("test.s3.base", "testpath")); final String endpointPath = System.getProperty("test.s3.endpoint"); if (endpointPath != null) { try { @@ -70,4 +74,18 @@ protected void createRepository(String repoName) { .setSettings(settings).get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); } + + @Override + protected void assertBlobsByPrefix(BlobPath path, String prefix, Map blobs) throws Exception { + // AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that + // to become consistent. + assertBusy(() -> super.assertBlobsByPrefix(path, prefix, blobs), 10L, TimeUnit.MINUTES); + } + + @Override + protected void assertChildren(BlobPath path, Collection children) throws Exception { + // AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that + // to become consistent. + assertBusy(() -> super.assertChildren(path, children), 10L, TimeUnit.MINUTES); + } } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 9c1bacb51bac7..b8f811295ed8c 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -158,6 +158,16 @@ default void deleteBlobIgnoringIfNotExists(String blobName) throws IOException { */ Map listBlobs() throws IOException; + /** + * Lists all child containers under this container. A child container is defined as a container whose {@link #path()} method returns + * a path that has this containers {@link #path()} return as its prefix and has one more path element than the current + * container's path. + * + * @return Map of name of the child container to child container + * @throws IOException on failure to list child containers + */ + Map children() throws IOException; + /** * Lists all blobs in the container that match the specified prefix. * diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index bab984bd85c74..a916515da9e0b 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.blobstore.fs; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; @@ -73,9 +74,22 @@ public Map listBlobs() throws IOException { return listBlobsByPrefix(null); } + @Override + public Map children() throws IOException { + Map builder = new HashMap<>(); + try (DirectoryStream stream = Files.newDirectoryStream(path)) { + for (Path file : stream) { + if (Files.isDirectory(file)) { + final String name = file.getFileName().toString(); + builder.put(name, new FsBlobContainer(blobStore, path().add(name), file)); + } + } + } + return unmodifiableMap(builder); + } + @Override public Map listBlobsByPrefix(String blobNamePrefix) throws IOException { - // If we get duplicate files we should just take the last entry Map builder = new HashMap<>(); blobNamePrefix = blobNamePrefix == null ? "" : blobNamePrefix; diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 5e11ab218a8d2..449722d03fc8c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -285,6 +285,10 @@ protected void doClose() { } } + public ThreadPool threadPool() { + return threadPool; + } + // package private, only use for testing BlobContainer getBlobContainer() { return blobContainer.get(); @@ -316,9 +320,10 @@ protected BlobContainer blobContainer() { } /** - * maintains single lazy instance of {@link BlobStore} + * Maintains single lazy instance of {@link BlobStore}. + * Public for testing. */ - protected BlobStore blobStore() { + public BlobStore blobStore() { assertSnapshotOrGenericThread(); BlobStore store = blobStore.get(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java index 5666869a1aa0b..935ae9f51b6c5 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java @@ -74,6 +74,11 @@ public Map listBlobs() throws IOException { return delegate.listBlobs(); } + @Override + public Map children() throws IOException { + return delegate.children(); + } + @Override public Map listBlobsByPrefix(String blobNamePrefix) throws IOException { return delegate.listBlobsByPrefix(blobNamePrefix); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 9ce111e1d3011..6879406d13a40 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -52,6 +52,7 @@ import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -338,6 +339,15 @@ public Map listBlobs() throws IOException { return super.listBlobs(); } + @Override + public Map children() throws IOException { + final Map res = new HashMap<>(); + for (Map.Entry entry : super.children().entrySet()) { + res.put(entry.getKey(), new MockBlobContainer(entry.getValue())); + } + return res; + } + @Override public Map listBlobsByPrefix(String blobNamePrefix) throws IOException { maybeIOExceptionOrBlock(blobNamePrefix); diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java index 90c399a5af6c7..586f332ff078d 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -18,12 +18,33 @@ */ package org.elasticsearch.repositories; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.settings.SecureSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESSingleNodeTestCase; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -41,10 +62,34 @@ protected Settings nodeSettings() { protected abstract void createRepository(String repoName); - - public void testCreateSnapshot() { + @Override + public void setUp() throws Exception { + super.setUp(); createRepository("test-repo"); + final BlobStoreRepository repo = getRepository(); + final PlainActionFuture future = PlainActionFuture.newFuture(); + repo.threadPool().generic().execute(new ActionRunnable<>(future) { + @Override + protected void doRun() throws Exception { + deleteContents(repo.blobStore().blobContainer(repo.basePath())); + future.onResponse(null); + } + }); + future.actionGet(); + assertChildren(repo.basePath(), Collections.emptyList()); + } + private static void deleteContents(BlobContainer container) throws IOException { + final List toDelete = new ArrayList<>(); + for (Map.Entry child : container.children().entrySet()) { + deleteContents(child.getValue()); + toDelete.add(child.getKey()); + } + toDelete.addAll(container.listBlobs().keySet()); + container.deleteBlobsIgnoringIfNotExists(toDelete); + } + + public void testCreateSnapshot() { createIndex("test-idx-1"); createIndex("test-idx-2"); createIndex("test-idx-3"); @@ -86,6 +131,75 @@ public void testCreateSnapshot() { .prepareDeleteSnapshot("test-repo", snapshotName) .get() .isAcknowledged()); + } + + public void testListChildren() throws Exception { + final BlobStoreRepository repo = getRepository(); + final PlainActionFuture future = PlainActionFuture.newFuture(); + final Executor genericExec = repo.threadPool().generic(); + final int testBlobLen = randomIntBetween(1, 100); + genericExec.execute(new ActionRunnable<>(future) { + @Override + protected void doRun() throws Exception { + final BlobStore blobStore = repo.blobStore(); + blobStore.blobContainer(repo.basePath().add("foo")) + .writeBlob("nested-blob", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false); + blobStore.blobContainer(repo.basePath().add("foo").add("nested")) + .writeBlob("bar", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false); + blobStore.blobContainer(repo.basePath().add("foo").add("nested2")) + .writeBlob("blub", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false); + future.onResponse(null); + } + }); + future.actionGet(); + assertChildren(repo.basePath(), Collections.singleton("foo")); + assertBlobsByPrefix(repo.basePath(), "fo", Collections.emptyMap()); + assertChildren(repo.basePath().add("foo"), List.of("nested", "nested2")); + assertBlobsByPrefix(repo.basePath().add("foo"), "nest", + Collections.singletonMap("nested-blob", new PlainBlobMetaData("nested-blob", testBlobLen))); + assertChildren(repo.basePath().add("foo").add("nested"), Collections.emptyList()); + } + + protected void assertBlobsByPrefix(BlobPath path, String prefix, Map blobs) throws Exception { + final PlainActionFuture> future = PlainActionFuture.newFuture(); + final BlobStoreRepository repository = getRepository(); + repository.threadPool().generic().execute(new ActionRunnable<>(future) { + @Override + protected void doRun() throws Exception { + final BlobStore blobStore = repository.blobStore(); + future.onResponse(blobStore.blobContainer(path).listBlobsByPrefix(prefix)); + } + }); + Map foundBlobs = future.actionGet(); + if (blobs.isEmpty()) { + assertThat(foundBlobs.keySet(), empty()); + } else { + assertThat(foundBlobs.keySet(), containsInAnyOrder(blobs.keySet().toArray(Strings.EMPTY_ARRAY))); + for (Map.Entry entry : foundBlobs.entrySet()) { + assertEquals(entry.getValue().length(), blobs.get(entry.getKey()).length()); + } + } + } + + protected void assertChildren(BlobPath path, Collection children) throws Exception { + final PlainActionFuture> future = PlainActionFuture.newFuture(); + final BlobStoreRepository repository = getRepository(); + repository.threadPool().generic().execute(new ActionRunnable<>(future) { + @Override + protected void doRun() throws Exception { + final BlobStore blobStore = repository.blobStore(); + future.onResponse(blobStore.blobContainer(path).children().keySet()); + } + }); + Set foundChildren = future.actionGet(); + if (children.isEmpty()) { + assertThat(foundChildren, empty()); + } else { + assertThat(foundChildren, containsInAnyOrder(children.toArray(Strings.EMPTY_ARRAY))); + } + } + private BlobStoreRepository getRepository() { + return (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo"); } }