Skip to content

Commit

Permalink
Prevent loading ephemeral objects in restore flow
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed May 27, 2024
1 parent e12131d commit f4f773f
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ public void testRemoteCleanupOnlyAfter10Updates() throws Exception {

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestClusterState(
cluster().getClusterName(),
getClusterState().metadata().clusterUUID()
getClusterState().metadata().clusterUUID(),
false
).getMetadata().getIndices();
assertEquals(replicaCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublish
}

if (applyFullState == true) {
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId());
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId(), true);
logger.debug("Downloaded full cluster state [{}]", clusterState);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public String getBlobFileName() {
generateBlobFileName();
}
String[] pathTokens = getFullBlobName().split(PATH_DELIMITER);
return getFullBlobName().split(PATH_DELIMITER)[pathTokens.length - 1];
return pathTokens[pathTokens.length - 1];
}

public abstract String generateBlobFileName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ BlobStore getBlobStore() {
* @param clusterName name of the cluster
* @return {@link IndexMetadata}
*/
public ClusterState getLatestClusterState(String clusterName, String clusterUUID) throws IOException {
public ClusterState getLatestClusterState(String clusterName, String clusterUUID, boolean includeEphemeral) throws IOException {
start();
Optional<ClusterMetadataManifest> clusterMetadataManifest = remoteManifestManager.getLatestClusterMetadataManifest(
clusterName,
Expand All @@ -815,7 +815,7 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID
);
}

return getClusterStateForManifest(clusterName, clusterMetadataManifest.get(), nodeId);
return getClusterStateForManifest(clusterName, clusterMetadataManifest.get(), nodeId, includeEphemeral);
}

private ClusterState readClusterStateInParallel(
Expand Down Expand Up @@ -1041,7 +1041,7 @@ private ClusterState readClusterStateInParallel(
.build();
}

public ClusterState getClusterStateForManifest(String clusterName, ClusterMetadataManifest manifest, String localNodeId) throws IOException {
public ClusterState getClusterStateForManifest(String clusterName, ClusterMetadataManifest manifest, String localNodeId, boolean includeEphemeral) throws IOException {
return readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
Expand All @@ -1053,9 +1053,9 @@ public ClusterState getClusterStateForManifest(String clusterName, ClusterMetada
manifest.getCoordinationMetadata() != null,
manifest.getSettingsMetadata() != null,
manifest.getTemplatesMetadata() != null,
manifest.getDiscoveryNodesMetadata() != null,
manifest.getClusterBlocksMetadata() != null,
manifest.getIndicesRouting()
includeEphemeral && manifest.getDiscoveryNodesMetadata() != null,
includeEphemeral && manifest.getClusterBlocksMetadata() != null,
includeEphemeral ? manifest.getIndicesRouting() : Collections.emptyList()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public RemoteRestoreResult restore(
throw new IllegalArgumentException("clusterUUID to restore from should be different from current cluster UUID");
}
logger.info("Restoring cluster state from remote store from cluster UUID : [{}]", restoreClusterUUID);
remoteState = remoteClusterStateService.getLatestClusterState(currentState.getClusterName().value(), restoreClusterUUID);
remoteState = remoteClusterStateService.getLatestClusterState(currentState.getClusterName().value(), restoreClusterUUID, false);
remoteState.getMetadata().getIndices().values().forEach(indexMetadata -> {
indexMetadataMap.put(indexMetadata.getIndex().getName(), new Tuple<>(true, indexMetadata));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@

package org.opensearch.repositories.blobstore;

import java.util.concurrent.CompletableFuture;
import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
Expand All @@ -44,11 +50,9 @@
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.io.Streams;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -63,22 +67,6 @@
import org.opensearch.gateway.CorruptStateException;
import org.opensearch.index.store.exception.ChecksumCombinationException;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum;

/**
* Snapshot metadata file format used in v2.0 and above
Expand Down Expand Up @@ -129,16 +117,6 @@ public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunct
return deserialize(blobName, namedXContentRegistry, Streams.readFully(blobContainer.readBlob(blobName)));
}

public void readAsync(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry, ExecutorService executorService, ActionListener<T> listener) throws IOException {
executorService.execute(() -> {
try {
listener.onResponse(read(blobContainer, name, namedXContentRegistry));
} catch (Exception e) {
listener.onFailure(e);
}
});
}

public String blobName(String name) {
return String.format(Locale.ROOT, getBlobNameFormat(), name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ public void testReadLatestMetadataManifestSuccessButNoIndexMetadata() throws IOE

remoteClusterStateService.start();
assertEquals(
remoteClusterStateService.getLatestClusterState(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
remoteClusterStateService.getLatestClusterState(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), false)
.getMetadata()
.getIndices()
.size(),
Expand Down Expand Up @@ -995,7 +995,8 @@ public void testReadLatestMetadataManifestSuccessButIndexMetadataFetchIOExceptio
IllegalStateException.class,
() -> remoteClusterStateService.getLatestClusterState(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
clusterState.metadata().clusterUUID(),
false
).getMetadata().getIndices()
);
assertEquals(e.getMessage(), "Error while downloading IndexMetadata - " + uploadedIndexMetadata.getUploadedFilename());
Expand Down Expand Up @@ -1063,7 +1064,8 @@ public void testReadGlobalMetadata() throws IOException {

ClusterState newClusterState = remoteClusterStateService.getLatestClusterState(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
clusterState.metadata().clusterUUID(),
false
);

assertTrue(Metadata.isGlobalStateEquals(newClusterState.getMetadata(), expactedMetadata));
Expand Down Expand Up @@ -1108,7 +1110,8 @@ public void testReadGlobalMetadataIOException() throws IOException {
IllegalStateException.class,
() -> remoteClusterStateService.getLatestClusterState(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
clusterState.metadata().clusterUUID(),
false
)
);
assertEquals(e.getMessage(), "Error while downloading Global Metadata - " + globalIndexMetadataName);
Expand Down Expand Up @@ -1147,7 +1150,8 @@ public void testReadLatestIndexMetadataSuccess() throws IOException {

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestClusterState(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
clusterState.metadata().clusterUUID(),
false
).getMetadata().getIndices();

assertEquals(indexMetadataMap.size(), 1);
Expand Down

0 comments on commit f4f773f

Please sign in to comment.