Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.11] fix stale remote cluster uuid state not purged from remote #10446

Merged
merged 1 commit into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ public class RemoteClusterStateService implements Closeable {
Property.Final
);

private static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state";
private static final String INDEX_PATH_TOKEN = "index";
private static final String MANIFEST_PATH_TOKEN = "manifest";
private static final String MANIFEST_FILE_PREFIX = "manifest";
private static final String INDEX_METADATA_FILE_PREFIX = "metadata";
public static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state";
public static final String INDEX_PATH_TOKEN = "index";
public static final String MANIFEST_PATH_TOKEN = "manifest";
public static final String MANIFEST_FILE_PREFIX = "manifest";
public static final String INDEX_METADATA_FILE_PREFIX = "metadata";

private final String nodeId;
private final Supplier<RepositoriesService> repositoriesService;
Expand Down Expand Up @@ -385,13 +385,20 @@ private void writeIndexMetadataAsync(
@Nullable
public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest)
throws IOException {
assert clusterState != null : "Last accepted cluster state is not set";
if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
logger.error("Local node is not elected cluster manager. Exiting");
return null;
}
assert clusterState != null : "Last accepted cluster state is not set";
assert previousManifest != null : "Last cluster metadata manifest is not set";
return uploadManifest(clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), true);
ClusterMetadataManifest committedManifest = uploadManifest(
clusterState,
previousManifest.getIndices(),
previousManifest.getPreviousClusterUUID(),
true
);
deleteStaleClusterUUIDs(clusterState, committedManifest);
return committedManifest;
}

@Override
Expand Down Expand Up @@ -719,30 +726,42 @@ private boolean isInvalidClusterUUID(ClusterMetadataManifest manifest) {
}

/**
* Fetch latest ClusterMetadataManifest file from remote state store
* Fetch ClusterMetadataManifest files from remote state store in order
*
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return latest ClusterMetadataManifest filename
* @param limit max no of files to fetch
* @return all manifest file names
*/
private Optional<String> getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException {
private List<BlobMetadata> getManifestFileNames(String clusterName, String clusterUUID, int limit) throws IllegalStateException {
try {
/**
* {@link BlobContainer#listBlobsByPrefixInSortedOrder} will get the latest manifest file
* {@link BlobContainer#listBlobsByPrefixInSortedOrder} will list the latest manifest file first
* as the manifest file name generated via {@link RemoteClusterStateService#getManifestFileName} ensures
* when sorted in LEXICOGRAPHIC order the latest uploaded manifest file comes on top.
*/
List<BlobMetadata> manifestFilesMetadata = manifestContainer(clusterName, clusterUUID).listBlobsByPrefixInSortedOrder(
return manifestContainer(clusterName, clusterUUID).listBlobsByPrefixInSortedOrder(
MANIFEST_FILE_PREFIX + DELIMITER,
1,
limit,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
);
if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) {
return Optional.of(manifestFilesMetadata.get(0).name());
}
} catch (IOException e) {
throw new IllegalStateException("Error while fetching latest manifest file for remote cluster state", e);
}
}

/**
* Fetch latest ClusterMetadataManifest file from remote state store
*
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return latest ClusterMetadataManifest filename
*/
private Optional<String> getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException {
List<BlobMetadata> manifestFilesMetadata = getManifestFileNames(clusterName, clusterUUID, 1);
if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) {
return Optional.of(manifestFilesMetadata.get(0).name());
}
logger.info("No manifest file present in remote store for cluster name: {}, cluster UUID: {}", clusterName, clusterUUID);
return Optional.empty();
}
Expand Down Expand Up @@ -791,7 +810,7 @@ public IndexMetadataTransferException(String errorDesc, Throwable cause) {
* @param clusterName name of the cluster
* @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged
*/
public void deleteStaleClusterMetadata(String clusterName, List<String> clusterUUIDs) {
private void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUIDs) {
clusterUUIDs.forEach(clusterUUID -> {
getBlobStoreTransferService().deleteAsync(
ThreadPool.Names.REMOTE_PURGE,
Expand Down Expand Up @@ -923,4 +942,27 @@ private void deleteStalePaths(String clusterName, String clusterUUID, List<Strin
logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
getBlobStoreTransferService().deleteBlobs(getCusterMetadataBasePath(clusterName, clusterUUID), stalePaths);
}

/**
* Purges all remote cluster state against provided cluster UUIDs
* @param clusterState current state of the cluster
* @param committedManifest last committed ClusterMetadataManifest
*/
public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
String clusterName = clusterState.getClusterName().value();
logger.info("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
Set<String> allClustersUUIDsInRemote;
try {
allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value()));
} catch (IOException e) {
logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName));
return;
}
// Retain last 2 cluster uuids data
allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID());
allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID());
deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;

import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
Expand All @@ -76,6 +78,8 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class RemoteClusterStateServiceTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -334,13 +338,8 @@ public void testReadLatestMetadataManifestFailedIOException() throws IOException
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();

BlobContainer blobContainer = mockBlobStoreObjects();
when(
blobContainer.listBlobsByPrefixInSortedOrder(
"manifest" + RemoteClusterStateService.DELIMITER,
1,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenThrow(IOException.class);
when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC))
.thenThrow(IOException.class);

remoteClusterStateService.start();
Exception e = assertThrows(
Expand All @@ -357,13 +356,8 @@ public void testReadLatestMetadataManifestFailedNoManifestFileInRemote() throws
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();

BlobContainer blobContainer = mockBlobStoreObjects();
when(
blobContainer.listBlobsByPrefixInSortedOrder(
"manifest" + RemoteClusterStateService.DELIMITER,
1,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenReturn(List.of());
when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC))
.thenReturn(List.of());

remoteClusterStateService.start();
Optional<ClusterMetadataManifest> manifest = remoteClusterStateService.getLatestClusterMetadataManifest(
Expand All @@ -378,13 +372,8 @@ public void testReadLatestMetadataManifestFailedManifestFileRemoveAfterFetchInRe

BlobContainer blobContainer = mockBlobStoreObjects();
BlobMetadata blobMetadata = new PlainBlobMetadata("manifestFileName", 1);
when(
blobContainer.listBlobsByPrefixInSortedOrder(
"manifest" + RemoteClusterStateService.DELIMITER,
1,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenReturn(Arrays.asList(blobMetadata));
when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC))
.thenReturn(Arrays.asList(blobMetadata));
when(blobContainer.readBlob("manifestFileName")).thenThrow(FileNotFoundException.class);

remoteClusterStateService.start();
Expand Down Expand Up @@ -618,6 +607,72 @@ public void testGetValidPreviousClusterUUIDWithInvalidMultipleChains() throws IO
assertThrows(IllegalStateException.class, () -> remoteClusterStateService.getLastKnownUUIDFromRemote("test-cluster"));
}

public void testDeleteStaleClusterUUIDs() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder()
.indices(List.of())
.clusterTerm(1L)
.stateVersion(1L)
.stateUUID(randomAlphaOfLength(10))
.clusterUUID("cluster-uuid1")
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID(ClusterState.UNKNOWN_UUID)
.committed(true)
.build();

BlobPath blobPath = new BlobPath().add("random-path");
when((blobStoreRepository.basePath())).thenReturn(blobPath);
BlobContainer uuidContainerContainer = mock(BlobContainer.class);
BlobContainer manifest2Container = mock(BlobContainer.class);
BlobContainer manifest3Container = mock(BlobContainer.class);
when(blobStore.blobContainer(any())).then(invocation -> {
BlobPath blobPath1 = invocation.getArgument(0);
if (blobPath1.buildAsString().endsWith("cluster-state/")) {
return uuidContainerContainer;
} else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) {
return manifest2Container;
} else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) {
return manifest3Container;
} else {
throw new IllegalArgumentException("Unexpected blob path " + blobPath1);
}
});
Map<String, BlobContainer> blobMetadataMap = Map.of(
"cluster-uuid1",
mock(BlobContainer.class),
"cluster-uuid2",
mock(BlobContainer.class),
"cluster-uuid3",
mock(BlobContainer.class)
);
when(uuidContainerContainer.children()).thenReturn(blobMetadataMap);
when(
manifest2Container.listBlobsByPrefixInSortedOrder(
MANIFEST_FILE_PREFIX + DELIMITER,
Integer.MAX_VALUE,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L)));
when(
manifest3Container.listBlobsByPrefixInSortedOrder(
MANIFEST_FILE_PREFIX + DELIMITER,
Integer.MAX_VALUE,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L)));
remoteClusterStateService.start();
remoteClusterStateService.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest);
try {
assertBusy(() -> {
verify(manifest2Container, times(1)).delete();
verify(manifest3Container, times(1)).delete();
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void mockObjectsForGettingPreviousClusterUUID(Map<String, String> clusterUUIDsPointers) throws IOException {
final BlobPath blobPath = mock(BlobPath.class);
when((blobStoreRepository.basePath())).thenReturn(blobPath);
Expand Down Expand Up @@ -760,13 +815,8 @@ private void mockBlobContainer(
Map<String, IndexMetadata> indexMetadataMap
) throws IOException {
BlobMetadata blobMetadata = new PlainBlobMetadata("manifestFileName", 1);
when(
blobContainer.listBlobsByPrefixInSortedOrder(
"manifest" + RemoteClusterStateService.DELIMITER,
1,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenReturn(Arrays.asList(blobMetadata));
when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC))
.thenReturn(Arrays.asList(blobMetadata));

BytesReference bytes = RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.serialize(
clusterMetadataManifest,
Expand Down