Skip to content

Commit

Permalink
Add support download latest index metadata from remote (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#9477)

---------

Signed-off-by: bansvaru <bansvaru@amazon.com>
Signed-off-by: Ivan Brusic <ivan.brusic@flocksafety.com>
  • Loading branch information
linuxpi authored and brusic committed Sep 25, 2023
1 parent 7a0f10e commit f11f0a2
Show file tree
Hide file tree
Showing 5 changed files with 376 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -349,17 +349,13 @@ private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs
}

@Override
public void listBlobsByPrefixInSortedOrder(
String blobNamePrefix,
int limit,
BlobNameSortOrder blobNameSortOrder,
ActionListener<List<BlobMetadata>> listener
) {
public List<BlobMetadata> listBlobsByPrefixInSortedOrder(String blobNamePrefix, int limit, BlobNameSortOrder blobNameSortOrder)
throws IOException {
// As AWS S3 returns list of keys in Lexicographic order, we don't have to fetch all the keys in order to sort them
// We fetch only keys as per the given limit to optimize the fetch. If provided sort order is not Lexicographic,
// we fall-back to default implementation of fetching all the keys and sorting them.
if (blobNameSortOrder != BlobNameSortOrder.LEXICOGRAPHIC) {
super.listBlobsByPrefixInSortedOrder(blobNamePrefix, limit, blobNameSortOrder, listener);
return super.listBlobsByPrefixInSortedOrder(blobNamePrefix, limit, blobNameSortOrder);
} else {
if (limit < 0) {
throw new IllegalArgumentException("limit should not be a negative value");
Expand All @@ -370,9 +366,9 @@ public void listBlobsByPrefixInSortedOrder(
.flatMap(listing -> listing.contents().stream())
.map(s3Object -> new PlainBlobMetadata(s3Object.key().substring(keyPath.length()), s3Object.size()))
.collect(Collectors.toList());
listener.onResponse(blobs.subList(0, Math.min(limit, blobs.size())));
return blobs.subList(0, Math.min(limit, blobs.size()));
} catch (final Exception e) {
listener.onFailure(new IOException("Exception when listing blobs by prefix [" + prefix + "]", e));
throw new IOException("Exception when listing blobs by prefix [" + prefix + "]", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,19 @@ default void listBlobsByPrefixInSortedOrder(
throw new IllegalArgumentException("limit should not be a negative value");
}
try {
List<BlobMetadata> blobNames = new ArrayList<>(listBlobsByPrefix(blobNamePrefix).values());
blobNames.sort(blobNameSortOrder.comparator());
listener.onResponse(blobNames.subList(0, Math.min(blobNames.size(), limit)));
listener.onResponse(listBlobsByPrefixInSortedOrder(blobNamePrefix, limit, blobNameSortOrder));
} catch (Exception e) {
listener.onFailure(e);
}
}

default List<BlobMetadata> listBlobsByPrefixInSortedOrder(String blobNamePrefix, int limit, BlobNameSortOrder blobNameSortOrder)
throws IOException {
if (limit < 0) {
throw new IllegalArgumentException("limit should not be a negative value");
}
List<BlobMetadata> blobNames = new ArrayList<>(listBlobsByPrefix(blobNamePrefix).values());
blobNames.sort(blobNameSortOrder.comparator());
return blobNames.subList(0, Math.min(blobNames.size(), limit));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
Expand All @@ -35,7 +36,9 @@
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -87,7 +90,7 @@ public class RemoteClusterStateService implements Closeable {
);
private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

private static final String DELIMITER = "__";
public static final String DELIMITER = "__";

private final String nodeId;
private final Supplier<RepositoriesService> repositoriesService;
Expand Down Expand Up @@ -367,4 +370,101 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) {
return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis()));
}

/**
* Fetch latest index metadata from remote cluster state
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return {@code Map<String, IndexMetadata>} latest IndexUUID to IndexMetadata map
*/
public Map<String, IndexMetadata> getLatestIndexMetadata(String clusterName, String clusterUUID) throws IOException {
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
ClusterMetadataManifest clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID())
: "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch.";
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) {
IndexMetadata indexMetadata = getIndexMetadata(clusterName, clusterUUID, uploadedIndexMetadata);
remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata);
}
return remoteIndexMetadata;
}

/**
* Fetch index metadata from remote cluster state
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @param uploadedIndexMetadata {@link UploadedIndexMetadata} contains details about remote location of index metadata
* @return {@link IndexMetadata}
*/
private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, UploadedIndexMetadata uploadedIndexMetadata) {
try {
return INDEX_METADATA_FORMAT.read(
indexMetadataContainer(clusterName, clusterUUID, uploadedIndexMetadata.getIndexUUID()),
uploadedIndexMetadata.getUploadedFilename(),
blobStoreRepository.getNamedXContentRegistry()
);
} catch (IOException e) {
throw new IllegalStateException(
String.format(Locale.ROOT, "Error while downloading IndexMetadata - %s", uploadedIndexMetadata.getUploadedFilename()),
e
);
}
}

/**
* Fetch latest ClusterMetadataManifest from remote state store
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return ClusterMetadataManifest
*/
public ClusterMetadataManifest getLatestClusterMetadataManifest(String clusterName, String clusterUUID) {
String latestManifestFileName = getLatestManifestFileName(clusterName, clusterUUID);
return fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, latestManifestFileName);
}

/**
* Fetch latest ClusterMetadataManifest file from remote state store
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return latest ClusterMetadataManifest filename
*/
private String getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException {
try {
/**
* {@link BlobContainer#listBlobsByPrefixInSortedOrder} will get the latest manifest file
* as the manifest file name generated via {@link RemoteClusterStateService#getManifestFileName} ensures
* when sorted in LEXICOGRAPHIC order the latest uploaded manifest file comes on top.
*/
List<BlobMetadata> manifestFilesMetadata = manifestContainer(clusterName, clusterUUID).listBlobsByPrefixInSortedOrder(
"manifest" + DELIMITER,
1,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
);
if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) {
return manifestFilesMetadata.get(0).name();
}
} catch (IOException e) {
throw new IllegalStateException("Error while fetching latest manifest file for remote cluster state", e);
}

throw new IllegalStateException(String.format(Locale.ROOT, "Remote Cluster State not found - %s", clusterUUID));
}

/**
* Fetch ClusterMetadataManifest from remote state store
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return ClusterMetadataManifest
*/
private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename)
throws IllegalStateException {
try {
return RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.read(
manifestContainer(clusterName, clusterUUID),
filename,
blobStoreRepository.getNamedXContentRegistry()
);
} catch (IOException e) {
throw new IllegalStateException(String.format(Locale.ROOT, "Error while downloading cluster metadata - %s", filename), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,10 @@ public RepositoryMetadata getMetadata() {
return metadata;
}

public NamedXContentRegistry getNamedXContentRegistry() {
return namedXContentRegistry;
}

public Compressor getCompressor() {
return compressor;
}
Expand Down
Loading

0 comments on commit f11f0a2

Please sign in to comment.