From e6bf8db3c670e8d504b2d5ef0b3f90c14b3a6f12 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Tue, 27 Aug 2024 17:43:02 +0530 Subject: [PATCH] Add tests and address comments Signed-off-by: Arpit Bandejiya --- CHANGELOG.md | 1 + .../RemoteClusterStateCleanupManagerIT.java | 13 ++- .../remote/RemoteStatePublicationIT.java | 35 +++++++ .../coordination/CoordinationState.java | 2 +- .../cluster/coordination/Coordinator.java | 10 +- .../coordination/InMemoryPersistedState.java | 2 +- .../PublicationTransportHandler.java | 23 +++-- .../opensearch/gateway/GatewayMetaState.java | 4 +- .../remote/RemoteClusterStateService.java | 32 +++---- .../gateway/remote/RemoteDownloadStats.java | 36 ------- .../remote/RemotePersistenceStats.java | 45 +++++---- .../gateway/remote/RemoteUploadStats.java | 5 + .../PublicationTransportHandlerTests.java | 93 ++++++++++++++++++- .../GatewayMetaStatePersistedStateTests.java | 7 +- .../RemoteClusterStateServiceTests.java | 6 +- .../AbstractCoordinatorTestCase.java | 2 +- 16 files changed, 219 insertions(+), 97 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 453f32ee74878..67091eaa6b850 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410)) - [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527)) - Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426)) +- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291))) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java index a4b7b7114fdac..47ec3f25bcd64 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java @@ -31,6 +31,7 @@ import java.util.Base64; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -40,6 +41,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS; import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -253,10 +255,13 @@ private void verifyIndexRoutingFilesDeletion( DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats(); assertNotNull(discoveryStats.getClusterStateStats()); for (PersistedStateStats persistedStateStats : discoveryStats.getClusterStateStats().getPersistenceStats()) { - Map extendedFields = persistedStateStats.getExtendedFields(); - assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)); - long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT).get(); - assertEquals(0, cleanupAttemptFailedCount); + if (Objects.equals(persistedStateStats.getStatsName(), REMOTE_UPLOAD)) { + Map extendedFields = persistedStateStats.getExtendedFields(); + assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)); + long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT) + .get(); + assertEquals(0, cleanupAttemptFailedCount); + } } } diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index 6a2e7ce4957ae..f237a81942e1a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -8,12 +8,15 @@ package org.opensearch.gateway.remote; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.client.Client; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.discovery.DiscoveryStats; import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; @@ -155,6 +158,38 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() { assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class)); } + public void testRemotePublicationDownloadStats() { + int shardCount = randomIntBetween(1, 2); + int replicaCount = 1; + int dataNodeCount = shardCount * (replicaCount + 1); + int clusterManagerNodeCount = 1; + prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount); + String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0); + + NodesStatsResponse nodesStatsResponseDataNode = client().admin() + .cluster() + .prepareNodesStats(dataNode) + .addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName()) + .get(); + + assertDataNodeDownloadStats(nodesStatsResponseDataNode); + + } + + private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) { + // assert cluster state stats for data node + DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats(); + assertNotNull(dataNodeDiscoveryStats.getClusterStateStats()); + assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess()); + assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0); + assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount()); + assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0); + + assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getSuccessCount() > 0); + assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getFailedCount()); + assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getTotalTimeInMillis() > 0); + } + private Map getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException { BlobPath metadataPath = repository.basePath() .add( diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 89a825a745788..c7820c2c9a365 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -652,7 +652,7 @@ public interface PersistedState extends Closeable { * Returns the stats for the persistence layer for {@link CoordinationState}. * @return PersistedStateStats */ - PersistedStateStats getUploadStats(); + PersistedStateStats getStats(); /** * Marks the last accepted cluster state as committed. diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index baacffb1fe648..9aaaa77bcbb23 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -896,14 +896,14 @@ public DiscoveryStats stats() { ClusterStateStats clusterStateStats = clusterManagerService.getClusterStateStats(); ArrayList stats = new ArrayList<>(); Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> { - if (persistedStateRegistry.getPersistedState(stateType) != null) { - if (persistedStateRegistry.getPersistedState(stateType).getUploadStats() != null) { - stats.add(persistedStateRegistry.getPersistedState(stateType).getUploadStats()); - } + if (persistedStateRegistry.getPersistedState(stateType) != null + && persistedStateRegistry.getPersistedState(stateType).getStats() != null) { + stats.add(persistedStateRegistry.getPersistedState(stateType).getStats()); } }); if (coordinationState.get().isRemotePublicationEnabled()) { - stats.add(publicationHandler.getDownloadStats()); + stats.add(publicationHandler.getFullDownloadStats()); + stats.add(publicationHandler.getDiffDownloadStats()); } clusterStateStats.setPersistenceStats(stats); return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java b/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java index 79fdb6bd922d8..b77ede5471534 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java @@ -66,7 +66,7 @@ public void setLastAcceptedState(ClusterState clusterState) { } @Override - public PersistedStateStats getUploadStats() { + public PersistedStateStats getStats() { return null; } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 620f5f862f649..ca36011b3a0e9 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -178,8 +178,12 @@ public PublishClusterStateStats stats() { ); } - public PersistedStateStats getDownloadStats() { - return remoteClusterStateService.getDownloadStats(); + public PersistedStateStats getFullDownloadStats() { + return remoteClusterStateService.getFullDownloadStats(); + } + + public PersistedStateStats getDiffDownloadStats() { + return remoteClusterStateService.getDiffDownloadStats(); } private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException { @@ -235,7 +239,8 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } // package private for testing - PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, RuntimeException { + PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException { + boolean applyFullState = false; try { if (transportService.getLocalNode().equals(request.getSourceNode())) { return acceptRemoteStateOnLocalNode(request); @@ -248,7 +253,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest if (manifest == null) { throw new IllegalStateException("Publication failed as manifest was not found for " + request); } - boolean applyFullState = false; final ClusterState lastSeen = lastSeenClusterState.get(); if (lastSeen == null) { logger.debug(() -> "Diff cannot be applied as there is no last cluster state"); @@ -262,7 +266,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest } if (applyFullState == true) { - remoteClusterStateService.fullDownloadState(); logger.debug( () -> new ParameterizedMessage( "Downloading full cluster state for term {}, version {}, stateUUID {}", @@ -282,7 +285,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest lastSeenClusterState.set(clusterState); return response; } else { - remoteClusterStateService.diffDownloadState(); logger.debug( () -> new ParameterizedMessage( "Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}", @@ -303,9 +305,12 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest return response; } } catch (Exception e) { - remoteClusterStateService.readMetadataFailed(); - if (e instanceof IOException) throw new IOException("IOException in reading remote cluster state", e); - throw new RuntimeException("Runtime exception in reading remote cluster state", e); + if (applyFullState) { + remoteClusterStateService.fullDownloadFailed(); + } else { + remoteClusterStateService.diffDownloadFailed(); + } + throw e; } } diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index 62f3b150db782..bd56c9e1757c6 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -618,7 +618,7 @@ public void setLastAcceptedState(ClusterState clusterState) { } @Override - public PersistedStateStats getUploadStats() { + public PersistedStateStats getStats() { // Note: These stats are not published yet, will come in future return null; } @@ -745,7 +745,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest( } @Override - public PersistedStateStats getUploadStats() { + public PersistedStateStats getStats() { return remoteClusterStateService.getUploadStats(); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 1d3899e71cdc2..2b3913ced0144 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1364,8 +1364,8 @@ public ClusterState getClusterStateForManifest( clusterState = ClusterState.builder(state).metadata(mb).build(); } final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); - remoteStateStats.stateDownloadSucceeded(); - remoteStateStats.stateDownloadTook(durationMillis); + remoteStateStats.stateFullDownloadSucceeded(); + remoteStateStats.stateFullDownloadTook(durationMillis); return clusterState; } @@ -1456,8 +1456,8 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C .build(); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); - remoteStateStats.stateDownloadSucceeded(); - remoteStateStats.stateDownloadTook(durationMillis); + remoteStateStats.stateDiffDownloadSucceeded(); + remoteStateStats.stateDiffDownloadTook(durationMillis); return clusterState; } @@ -1658,27 +1658,27 @@ public void writeMetadataFailed() { remoteStateStats.stateUploadFailed(); } - public void readMetadataFailed() { - remoteStateStats.stateDownloadFailed(); + public RemotePersistenceStats getRemoteStateStats() { + return remoteStateStats; } - public void fullDownloadState() { - remoteStateStats.fullDownloadState(); + public PersistedStateStats getUploadStats() { + return remoteStateStats.getUploadStats(); } - public void diffDownloadState() { - remoteStateStats.diffDownloadState(); + public PersistedStateStats getFullDownloadStats() { + return remoteStateStats.getRemoteFullDownloadStats(); } - public RemotePersistenceStats getRemoteStateStats() { - return remoteStateStats; + public PersistedStateStats getDiffDownloadStats() { + return remoteStateStats.getRemoteDiffDownloadStats(); } - public PersistedStateStats getUploadStats() { - return remoteStateStats.getUploadStats(); + public void fullDownloadFailed() { + remoteStateStats.stateFullDownloadFailed(); } - public PersistedStateStats getDownloadStats() { - return remoteStateStats.getDownloadStats(); + public void diffDownloadFailed() { + remoteStateStats.stateDiffDownloadFailed(); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java deleted file mode 100644 index 8c1505ab5ed89..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote; - -import org.opensearch.cluster.coordination.PersistedStateStats; - -import java.util.concurrent.atomic.AtomicLong; - -public class RemoteDownloadStats extends PersistedStateStats { - static final String REMOTE_DOWNLOAD = "remote_download"; - static final String DIFF_DOWNLOAD = "diff_download"; - private AtomicLong diffDownloadCount = new AtomicLong(0); - static final String FULL_DOWNLOAD = "full_download"; - private AtomicLong fullDownloadCount = new AtomicLong(0); - - public RemoteDownloadStats() { - super(REMOTE_DOWNLOAD); - addToExtendedFields(DIFF_DOWNLOAD, diffDownloadCount); - addToExtendedFields(FULL_DOWNLOAD, fullDownloadCount); - } - - public void diffDownloadState() { - diffDownloadCount.incrementAndGet(); - } - - public void fullDownloadState() { - fullDownloadCount.incrementAndGet(); - } - -} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java index e9fe9a94c0377..417ebdafd3ba7 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java @@ -18,11 +18,16 @@ public class RemotePersistenceStats { RemoteUploadStats remoteUploadStats; - RemoteDownloadStats remoteDownloadStats; + PersistedStateStats remoteDiffDownloadStats; + PersistedStateStats remoteFullDownloadStats; + + final String FULL_DOWNLOAD_STATS = "remote_full_download"; + final String DIFF_DOWNLOAD_STATS = "remote_diff_download"; public RemotePersistenceStats() { remoteUploadStats = new RemoteUploadStats(); - remoteDownloadStats = new RemoteDownloadStats(); + remoteDiffDownloadStats = new PersistedStateStats(DIFF_DOWNLOAD_STATS); + remoteFullDownloadStats = new PersistedStateStats(FULL_DOWNLOAD_STATS); } public void cleanUpAttemptFailed() { @@ -61,32 +66,40 @@ public void stateUploadFailed() { remoteUploadStats.stateFailed(); } - public void stateDownloadSucceeded() { - remoteDownloadStats.stateSucceeded(); + public void stateFullDownloadSucceeded() { + remoteFullDownloadStats.stateSucceeded(); } - public void stateDownloadTook(long durationMillis) { - remoteDownloadStats.stateTook(durationMillis); + public void stateDiffDownloadSucceeded() { + remoteDiffDownloadStats.stateSucceeded(); } - public void stateDownloadFailed() { - remoteDownloadStats.stateFailed(); + public void stateFullDownloadTook(long durationMillis) { + remoteFullDownloadStats.stateTook(durationMillis); } - public PersistedStateStats getUploadStats() { - return remoteUploadStats; + public void stateDiffDownloadTook(long durationMillis) { + remoteDiffDownloadStats.stateTook(durationMillis); + } + + public void stateFullDownloadFailed() { + remoteFullDownloadStats.stateFailed(); } - public PersistedStateStats getDownloadStats() { - return remoteDownloadStats; + public void stateDiffDownloadFailed() { + remoteDiffDownloadStats.stateFailed(); + } + + public PersistedStateStats getUploadStats() { + return remoteUploadStats; } - public void diffDownloadState() { - remoteDownloadStats.diffDownloadState(); + public PersistedStateStats getRemoteDiffDownloadStats() { + return remoteDiffDownloadStats; } - public void fullDownloadState() { - remoteDownloadStats.fullDownloadState(); + public PersistedStateStats getRemoteFullDownloadStats() { + return remoteFullDownloadStats; } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteUploadStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteUploadStats.java index d8ba9219e7bbe..9ffef65ae1eba 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteUploadStats.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteUploadStats.java @@ -12,6 +12,11 @@ import java.util.concurrent.atomic.AtomicLong; +/** + * Upload stats for remote state + * + * @opensearch.internal + */ public class RemoteUploadStats extends PersistedStateStats { static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count"; static final String INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "index_routing_files_cleanup_attempt_failed_count"; diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java index 65bc228cd5704..266928c919fe2 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java @@ -50,6 +50,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.gateway.GatewayMetaState.RemotePersistedState; import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.ClusterStateDiffManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.telemetry.tracing.noop.NoopTracer; @@ -74,8 +75,12 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class PublicationTransportHandlerTests extends OpenSearchTestCase { @@ -175,7 +180,9 @@ public void testHandleIncomingRemotePublishRequestWhenNoCurrentPublishRequest() () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest) ); assertThat(e.getMessage(), containsString("publication to self failed")); - Mockito.verifyNoInteractions(remoteClusterStateService); + verify(remoteClusterStateService, times(0)).fullDownloadFailed(); + verify(remoteClusterStateService, times(1)).diffDownloadFailed(); + verifyNoMoreInteractions(remoteClusterStateService); } public void testHandleIncomingRemotePublishRequestWhenTermMismatch() { @@ -200,7 +207,9 @@ public void testHandleIncomingRemotePublishRequestWhenTermMismatch() { () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest) ); assertThat(e.getMessage(), containsString("publication to self failed")); - Mockito.verifyNoInteractions(remoteClusterStateService); + verify(remoteClusterStateService, times(0)).fullDownloadFailed(); + verify(remoteClusterStateService, times(1)).diffDownloadFailed(); + verifyNoMoreInteractions(remoteClusterStateService); } public void testHandleIncomingRemotePublishRequestWhenVersionMismatch() { @@ -225,7 +234,9 @@ public void testHandleIncomingRemotePublishRequestWhenVersionMismatch() { () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest) ); assertThat(e.getMessage(), containsString("publication to self failed")); - Mockito.verifyNoInteractions(remoteClusterStateService); + verify(remoteClusterStateService, times(1)).diffDownloadFailed(); + verify(remoteClusterStateService, times(0)).fullDownloadFailed(); + verifyNoMoreInteractions(remoteClusterStateService); } public void testHandleIncomingRemotePublishRequestForLocalNode() throws IOException { @@ -250,6 +261,82 @@ public void testHandleIncomingRemotePublishRequestForLocalNode() throws IOExcept Mockito.verifyNoInteractions(remoteClusterStateService); } + public void testDownloadRemotePersistedFullStateFailedStats() throws IOException { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + PersistedStateStats remoteFullDownloadStats = new PersistedStateStats("dummy_full_stats"); + PersistedStateStats remoteDiffDownloadStats = new PersistedStateStats("dummy_diff_stats"); + when(remoteClusterStateService.getFullDownloadStats()).thenReturn(remoteFullDownloadStats); + when(remoteClusterStateService.getDiffDownloadStats()).thenReturn(remoteDiffDownloadStats); + + doAnswer((i) -> { + remoteFullDownloadStats.stateFailed(); + return null; + }).when(remoteClusterStateService).fullDownloadFailed(); + + doAnswer((i) -> { + remoteDiffDownloadStats.stateFailed(); + return null; + }).when(remoteClusterStateService).diffDownloadFailed(); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + secondNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + ClusterState clusterState = buildClusterState(TERM, VERSION); + PublishRequest publishRequest = new PublishRequest(clusterState); + handler.setCurrentPublishRequestToSelf(publishRequest); + + assertThrows(IllegalStateException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)); + assertEquals(1, remoteClusterStateService.getDiffDownloadStats().getFailedCount()); + assertEquals(0, remoteClusterStateService.getFullDownloadStats().getFailedCount()); + } + + public void testDownloadRemotePersistedDiffStateFailedStats() throws IOException { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + PersistedStateStats remoteDiffDownloadStats = new PersistedStateStats("dummy_stats"); + when(remoteClusterStateService.getDiffDownloadStats()).thenReturn(remoteDiffDownloadStats); + + ClusterMetadataManifest metadataManifest = new ClusterMetadataManifest.Builder().diffManifest( + new ClusterStateDiffManifest.Builder().fromStateUUID("state-uuid").build() + ).build(); + when(remoteClusterStateService.getClusterMetadataManifestByFileName(any(), any())).thenReturn(metadataManifest); + + doAnswer((i) -> { + remoteDiffDownloadStats.stateFailed(); + return null; + }).when(remoteClusterStateService).diffDownloadFailed(); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.nodes()).thenReturn(mock(DiscoveryNodes.class)); + handler.setLastSeenClusterState(clusterState); + when(clusterState.stateUUID()).thenReturn("state-uuid"); + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + secondNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + clusterState = buildClusterState(TERM, VERSION); + PublishRequest publishRequest = new PublishRequest(clusterState); + handler.setCurrentPublishRequestToSelf(publishRequest); + + assertThrows(NullPointerException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)); + assertEquals(1, remoteClusterStateService.getDiffDownloadStats().getFailedCount()); + + } + public void testHandleIncomingRemotePublishRequestWhenManifestNotFound() throws IOException { RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 73eccc1b17c7f..5ac94281822b8 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -112,7 +112,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -906,7 +906,10 @@ public void testRemotePersistedStateFailureStats() throws IOException { .when(remoteClusterStateService) .writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION)); when(remoteClusterStateService.getUploadStats()).thenReturn(remoteStateStats); - doCallRealMethod().when(remoteClusterStateService).writeMetadataFailed(); + doAnswer((i) -> { + remoteStateStats.stateFailed(); + return null; + }).when(remoteClusterStateService).writeMetadataFailed(); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); final long clusterTerm = randomNonNegativeLong(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index cd03d948f76ef..d57cc17acedcb 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -861,6 +861,10 @@ public void testGetClusterStateForManifest_IncludeEphemeral() throws IOException when(mockedResult.getComponent()).thenReturn(COORDINATION_METADATA); RemoteClusterStateService mockService = spy(remoteClusterStateService); mockService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true); + + assertNotNull(remoteClusterStateService.getFullDownloadStats()); + assertEquals(1, remoteClusterStateService.getFullDownloadStats().getSuccessCount()); + assertEquals(0, remoteClusterStateService.getFullDownloadStats().getFailedCount()); verify(mockService, times(1)).readClusterStateInParallel( any(), eq(manifest), @@ -2590,7 +2594,7 @@ public void testGetValidPreviousClusterUUIDWhenLastUUIDUncommitted() throws IOEx assertThat(previousClusterUUID, equalTo("cluster-uuid2")); } - public void testRemoteStateStats() throws IOException { + public void testRemoteStateUploadStats() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); remoteClusterStateService.start(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 1bcc5b03e1eb5..b432e5411404e 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1026,7 +1026,7 @@ public void setLastAcceptedState(ClusterState clusterState) { } @Override - public PersistedStateStats getUploadStats() { + public PersistedStateStats getStats() { return null; }