Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ability to List Child Containers to BlobContainer #42653

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.common.blobstore.url;

import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
Expand Down Expand Up @@ -74,6 +75,11 @@ public Map<String, BlobMetaData> listBlobs() throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

@Override
public Map<String, BlobContainer> children() throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

/**
* This operation is not supported by URLBlobContainer
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
Expand Down Expand Up @@ -135,6 +136,16 @@ public Map<String, BlobMetaData> listBlobs() throws IOException {
return listBlobsByPrefix(null);
}

@Override
public Map<String, BlobContainer> children() throws IOException {
final BlobPath path = path();
try {
return blobStore.children(path);
} catch (URISyntaxException | StorageException e) {
throw new IOException("Failed to list children in path [" + path.buildAsString() + "].", e);
}
}

protected String buildKey(String blobName) {
return keyPath + (blobName == null ? "" : blobName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;

Expand Down Expand Up @@ -98,6 +101,11 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix
return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
}

public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException {
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this))));
}

public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@
import com.microsoft.azure.storage.blob.BlobInputStream;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -54,7 +57,9 @@
import java.security.InvalidKeyException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -214,15 +219,40 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
final String blobPath = uri.getPath().substring(1 + container.length() + 1);
final BlobProperties properties = ((CloudBlockBlob) blobItem).getProperties();
final String name = blobPath.substring(keyPath.length());
logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength()));
blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength()));
if (blobItem instanceof CloudBlob) {
final BlobProperties properties = ((CloudBlob) blobItem).getProperties();
final String name = blobPath.substring(keyPath.length());
logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength()));
blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength()));
}
}
});
return Map.copyOf(blobsBuilder);
}

public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException {
final var blobsBuilder = new HashSet<String>();
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final String keyPath = path.buildAsString();
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);

SocketAccess.doPrivilegedVoidException(() -> {
for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath, false, enumBlobListingDetails, null, client.v2().get())) {
if (blobItem instanceof CloudBlobDirectory) {
final URI uri = blobItem.getUri();
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
// Lastly, we add the length of keyPath to the offset to strip this container's path.
final String uriPath = uri.getPath();
blobsBuilder.add(uriPath.substring(1 + container.length() + 1 + keyPath.length(), uriPath.length() - 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment mentions 1 + container.length() + 1, why do we have keyPath.length here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to strip our own path (that of the blob container as well. I'll amend the comment.

}
}
});
return Set.copyOf(blobsBuilder);
}

public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.repositories.gcs;

import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStoreException;
Expand Down Expand Up @@ -55,6 +56,11 @@ public Map<String, BlobMetaData> listBlobs() throws IOException {
return blobStore.listBlobs(path);
}

@Override
public Map<String, BlobContainer> children() throws IOException {
return blobStore.listChildren(path());
}

@Override
public Map<String, BlobMetaData> listBlobsByPrefix(String prefix) throws IOException {
return blobStore.listBlobsByPrefix(path, prefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,23 @@ Map<String, BlobMetaData> listBlobsByPrefix(String path, String prefix) throws I
return mapBuilder.immutableMap();
}

Map<String, BlobContainer> listChildren(BlobPath path) throws IOException {
final String pathStr = path.buildAsString();
final MapBuilder<String, BlobContainer> mapBuilder = MapBuilder.newMapBuilder();
SocketAccess.doPrivilegedVoidIOException
(() -> client().get(bucketName).list(BlobListOption.currentDirectory(), BlobListOption.prefix(pathStr)).iterateAll().forEach(
blob -> {
if (blob.isDirectory()) {
assert blob.getName().startsWith(pathStr);
final String suffixName = blob.getName().substring(pathStr.length());
if (suffixName.isEmpty() == false) {
mapBuilder.put(suffixName, new GoogleCloudStorageBlobContainer(path.add(suffixName), this));
}
}
}));
return mapBuilder.immutableMap();
}

/**
* Returns true if the blob exists in the specific bucket
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Path;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
Expand Down Expand Up @@ -137,11 +138,13 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS

@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable final String prefix) throws IOException {
FileStatus[] files = store.execute(fileContext -> (fileContext.util().listStatus(path,
path -> prefix == null || path.getName().startsWith(prefix))));
Map<String, BlobMetaData> map = new LinkedHashMap<String, BlobMetaData>();
FileStatus[] files = store.execute(fileContext -> fileContext.util().listStatus(path,
path -> prefix == null || path.getName().startsWith(prefix)));
Map<String, BlobMetaData> map = new LinkedHashMap<>();
for (FileStatus file : files) {
map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
if (file.isFile()) {
map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
}
}
return Collections.unmodifiableMap(map);
}
Expand All @@ -151,6 +154,19 @@ public Map<String, BlobMetaData> listBlobs() throws IOException {
return listBlobsByPrefix(null);
}

@Override
public Map<String, BlobContainer> children() throws IOException {
FileStatus[] files = store.execute(fileContext -> fileContext.util().listStatus(path));
Map<String, BlobContainer> map = new LinkedHashMap<>();
for (FileStatus file : files) {
if (file.isDirectory()) {
final String name = file.getPath().getName();
map.put(name, new HdfsBlobContainer(path().add(name), store, new Path(path, name), bufferSize, securityContext));
}
}
return Collections.unmodifiableMap(map);
}

/**
* Exists to wrap underlying InputStream methods that might make socket connections in
* doPrivileged blocks. This is due to the way that hdfs client libraries might open
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.hdfs;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.bootstrap.JavaVersion;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;

import java.util.Collection;

import static org.hamcrest.Matchers.equalTo;

@ThreadLeakFilters(filters = HdfsClientThreadLeakFilter.class)
public class HdfsRepositoryTests extends AbstractThirdPartyRepositoryTestCase {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though it's not technically third party, I figured it would be good to have this running for HDFS as well right?


@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(HdfsPlugin.class);
}

@Override
protected SecureSettings credentials() {
return new MockSecureSettings();
}

@Override
protected void createRepository(String repoName) {
assumeFalse("https://github.com/elastic/elasticsearch/issues/31498", JavaVersion.current().equals(JavaVersion.parse("11")));
AcknowledgedResponse putRepositoryResponse = client().admin().cluster().preparePutRepository(repoName)
.setType("hdfs")
.setSettings(Settings.builder()
.put("uri", "hdfs:///")
.put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName())
.put("path", "foo")
.put("chunk_size", randomIntBetween(100, 1000) + "k")
.put("compress", randomBoolean())
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
}
}
4 changes: 1 addition & 3 deletions plugins/repository-s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ if (useFixture) {
def minioAddress = {
int minioPort = postProcessFixture.ext."test.fixtures.minio-fixture.tcp.9000"
assert minioPort > 0
return 'http://127.0.0.1:' + minioPort
'http://127.0.0.1:' + minioPort
}

File minioAddressFile = new File(project.buildDir, 'generated-resources/s3Fixture.address')
Expand All @@ -173,15 +173,13 @@ if (useFixture) {
// and pass its name instead.
task writeMinioAddress {
dependsOn tasks.bundlePlugin, tasks.postProcessFixture
outputs.file(minioAddressFile)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding these outputs and inputs was a mistake, repeated runs against minio fail because of them since we need to update the file when repeatedly starting Minio.

doLast {
file(minioAddressFile).text = "${ -> minioAddress.call() }"
}
}

thirdPartyTest {
dependsOn writeMinioAddress
inputs.file(minioAddressFile)
systemProperty 'test.s3.endpoint', minioAddressFile.name
}

Expand Down
Loading