From b8f74eedc5976da036d6d1efd494f3c0b7412401 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Tue, 27 Aug 2024 17:43:02 +0530 Subject: [PATCH] Add UTs and address comments Signed-off-by: Arpit Bandejiya --- .../coordination/CoordinationState.java | 2 +- .../cluster/coordination/Coordinator.java | 7 +- .../coordination/InMemoryPersistedState.java | 2 +- .../PublicationTransportHandler.java | 9 +- .../opensearch/gateway/GatewayMetaState.java | 4 +- .../gateway/remote/RemoteDownloadStats.java | 12 ++ .../gateway/remote/RemoteUploadStats.java | 5 + .../PublicationTransportHandlerTests.java | 128 +++++++++++++++++- .../GatewayMetaStatePersistedStateTests.java | 7 +- .../RemoteClusterStateServiceTests.java | 6 +- .../AbstractCoordinatorTestCase.java | 2 +- 11 files changed, 164 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 89a825a745788..c7820c2c9a365 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -652,7 +652,7 @@ public interface PersistedState extends Closeable { * Returns the stats for the persistence layer for {@link CoordinationState}. * @return PersistedStateStats */ - PersistedStateStats getUploadStats(); + PersistedStateStats getStats(); /** * Marks the last accepted cluster state as committed. diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 93766267ee196..ae8c106f595d8 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -896,10 +896,9 @@ public DiscoveryStats stats() { ClusterStateStats clusterStateStats = clusterManagerService.getClusterStateStats(); ArrayList stats = new ArrayList<>(); Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> { - if (persistedStateRegistry.getPersistedState(stateType) != null) { - if (persistedStateRegistry.getPersistedState(stateType).getUploadStats() != null) { - stats.add(persistedStateRegistry.getPersistedState(stateType).getUploadStats()); - } + if (persistedStateRegistry.getPersistedState(stateType) != null + && persistedStateRegistry.getPersistedState(stateType).getStats() != null) { + stats.add(persistedStateRegistry.getPersistedState(stateType).getStats()); } }); if (coordinationState.get().isRemotePublicationEnabled()) { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java b/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java index 79fdb6bd922d8..b77ede5471534 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java @@ -66,7 +66,7 @@ public void setLastAcceptedState(ClusterState clusterState) { } @Override - public PersistedStateStats getUploadStats() { + public PersistedStateStats getStats() { return null; } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index d3e592d0d4e5a..01db68834cc23 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -233,7 +233,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } // package private for testing - PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, RuntimeException { + PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException { try { if (transportService.getLocalNode().equals(request.getSourceNode())) { return acceptRemoteStateOnLocalNode(request); @@ -260,7 +260,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest } if (applyFullState == true) { - remoteClusterStateService.fullDownloadState(); logger.debug( () -> new ParameterizedMessage( "Downloading full cluster state for term {}, version {}, stateUUID {}", @@ -278,9 +277,9 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest fullClusterStateReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.set(clusterState); + remoteClusterStateService.fullDownloadState(); return response; } else { - remoteClusterStateService.diffDownloadState(); logger.debug( () -> new ParameterizedMessage( "Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}", @@ -298,12 +297,12 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest compatibleClusterStateDiffReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.compareAndSet(lastSeen, clusterState); + remoteClusterStateService.diffDownloadState(); return response; } } catch (Exception e) { remoteClusterStateService.readMetadataFailed(); - if (e instanceof IOException) throw new IOException("IOException in reading remote cluster state", e); - throw new RuntimeException("Runtime exception in reading remote cluster state", e); + throw e; } } diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index dd98107d61c5a..1fd3e00d1247a 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -618,7 +618,7 @@ public void setLastAcceptedState(ClusterState clusterState) { } @Override - public PersistedStateStats getUploadStats() { + public PersistedStateStats getStats() { // Note: These stats are not published yet, will come in future return null; } @@ -740,7 +740,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest( } @Override - public PersistedStateStats getUploadStats() { + public PersistedStateStats getStats() { return remoteClusterStateService.getUploadStats(); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java index 8c1505ab5ed89..4794058463695 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java @@ -12,6 +12,11 @@ import java.util.concurrent.atomic.AtomicLong; +/** + * Download stats for remote publication + * + * @opensearch.internal + */ public class RemoteDownloadStats extends PersistedStateStats { static final String REMOTE_DOWNLOAD = "remote_download"; static final String DIFF_DOWNLOAD = "diff_download"; @@ -33,4 +38,11 @@ public void fullDownloadState() { fullDownloadCount.incrementAndGet(); } + public long getDiffDownloadCount() { + return diffDownloadCount.get(); + } + + public long getFullDownloadCount() { + return fullDownloadCount.get(); + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteUploadStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteUploadStats.java index d8ba9219e7bbe..9ffef65ae1eba 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteUploadStats.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteUploadStats.java @@ -12,6 +12,11 @@ import java.util.concurrent.atomic.AtomicLong; +/** + * Upload stats for remote state + * + * @opensearch.internal + */ public class RemoteUploadStats extends PersistedStateStats { static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count"; static final String INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "index_routing_files_cleanup_attempt_failed_count"; diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java index 08e3f47100d8c..c252b82c35875 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java @@ -44,7 +44,9 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.ClusterStateDiffManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.RemoteDownloadStats; import org.opensearch.node.Node; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; @@ -62,8 +64,12 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class PublicationTransportHandlerTests extends OpenSearchTestCase { @@ -160,7 +166,8 @@ public void testHandleIncomingRemotePublishRequestWhenNoCurrentPublishRequest() () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest) ); assertThat(e.getMessage(), containsString("publication to self failed")); - Mockito.verifyNoInteractions(remoteClusterStateService); + verify(remoteClusterStateService, times(1)).readMetadataFailed(); + verifyNoMoreInteractions(remoteClusterStateService); } public void testHandleIncomingRemotePublishRequestWhenTermMismatch() { @@ -185,7 +192,8 @@ public void testHandleIncomingRemotePublishRequestWhenTermMismatch() { () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest) ); assertThat(e.getMessage(), containsString("publication to self failed")); - Mockito.verifyNoInteractions(remoteClusterStateService); + verify(remoteClusterStateService, times(1)).readMetadataFailed(); + verifyNoMoreInteractions(remoteClusterStateService); } public void testHandleIncomingRemotePublishRequestWhenVersionMismatch() { @@ -210,7 +218,8 @@ public void testHandleIncomingRemotePublishRequestWhenVersionMismatch() { () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest) ); assertThat(e.getMessage(), containsString("publication to self failed")); - Mockito.verifyNoInteractions(remoteClusterStateService); + verify(remoteClusterStateService, times(1)).readMetadataFailed(); + verifyNoMoreInteractions(remoteClusterStateService); } public void testHandleIncomingRemotePublishRequestForLocalNode() throws IOException { @@ -235,6 +244,119 @@ public void testHandleIncomingRemotePublishRequestForLocalNode() throws IOExcept Mockito.verifyNoInteractions(remoteClusterStateService); } + public void testDownloadRemotePersistedFailedStats() throws IOException { + RemoteDownloadStats remoteDownloadStats = new RemoteDownloadStats(); + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + when(remoteClusterStateService.getDownloadStats()).thenReturn(remoteDownloadStats); + + doAnswer((i) -> { + remoteDownloadStats.stateFailed(); + return null; + }).when(remoteClusterStateService).readMetadataFailed(); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + secondNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + ClusterState clusterState = buildClusterState(TERM, VERSION); + PublishRequest publishRequest = new PublishRequest(clusterState); + handler.setCurrentPublishRequestToSelf(publishRequest); + + assertThrows(IllegalStateException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)); + assertEquals(1, remoteClusterStateService.getDownloadStats().getFailedCount()); + } + + public void testDownloadRemotePersistedDiffStats() throws IOException { + RemoteDownloadStats remoteDownloadStats = new RemoteDownloadStats(); + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + when(remoteClusterStateService.getDownloadStats()).thenReturn(remoteDownloadStats); + ClusterMetadataManifest metadataManifest = new ClusterMetadataManifest.Builder().diffManifest( + new ClusterStateDiffManifest.Builder().fromStateUUID("state-uuid").build() + ).build(); + when(remoteClusterStateService.getClusterMetadataManifestByFileName(any(), any())).thenReturn(metadataManifest); + + doAnswer((i) -> { + remoteDownloadStats.diffDownloadState(); + return null; + }).when(remoteClusterStateService).diffDownloadState(); + + doAnswer((i) -> { + remoteDownloadStats.fullDownloadState(); + return null; + }).when(remoteClusterStateService).fullDownloadState(); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + ClusterState clusterState = mock(ClusterState.class); + handler.setLastSeenClusterState(clusterState); + when(clusterState.stateUUID()).thenReturn("state-uuid"); + + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + secondNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + clusterState = buildClusterState(TERM, VERSION); + PublishRequest publishRequest = new PublishRequest(clusterState); + handler.setCurrentPublishRequestToSelf(publishRequest); + assertThrows(NullPointerException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)); + assertEquals(1, remoteDownloadStats.getDiffDownloadCount()); + assertEquals(0, remoteDownloadStats.getFullDownloadCount()); + } + + public void testDownloadRemotePersistedFullStats() throws IOException { + RemoteDownloadStats remoteDownloadStats = new RemoteDownloadStats(); + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + when(remoteClusterStateService.getDownloadStats()).thenReturn(remoteDownloadStats); + ClusterMetadataManifest metadataManifest = new ClusterMetadataManifest.Builder().diffManifest( + new ClusterStateDiffManifest.Builder().fromStateUUID("state-uuid2").build() + ).build(); + when(remoteClusterStateService.getClusterMetadataManifestByFileName(any(), any())).thenReturn(metadataManifest); + + doAnswer((i) -> { + remoteDownloadStats.diffDownloadState(); + return null; + }).when(remoteClusterStateService).diffDownloadState(); + + doAnswer((i) -> { + remoteDownloadStats.fullDownloadState(); + return null; + }).when(remoteClusterStateService).fullDownloadState(); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + ClusterState clusterState = mock(ClusterState.class); + handler.setLastSeenClusterState(clusterState); + when(clusterState.stateUUID()).thenReturn("state-uuid"); + + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + secondNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + clusterState = buildClusterState(TERM, VERSION); + PublishRequest publishRequest = new PublishRequest(clusterState); + handler.setCurrentPublishRequestToSelf(publishRequest); + assertThrows(NullPointerException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)); + assertEquals(0, remoteDownloadStats.getDiffDownloadCount()); + assertEquals(1, remoteDownloadStats.getFullDownloadCount()); + } + public void testHandleIncomingRemotePublishRequestWhenManifestNotFound() throws IOException { RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 191d40862fe6d..71cb5ab0687b1 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -109,7 +109,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -837,7 +837,10 @@ public void testRemotePersistedStateFailureStats() throws IOException { final String previousClusterUUID = "prev-cluster-uuid"; Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any(), Mockito.any()); when(remoteClusterStateService.getUploadStats()).thenReturn(remoteStateStats); - doCallRealMethod().when(remoteClusterStateService).writeMetadataFailed(); + doAnswer((i) -> { + remoteStateStats.stateFailed(); + return null; + }).when(remoteClusterStateService).writeMetadataFailed(); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); final long clusterTerm = randomNonNegativeLong(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 4cbd0e2fdcf00..40919ca718a28 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -851,6 +851,10 @@ public void testGetClusterStateForManifest_IncludeEphemeral() throws IOException when(mockedResult.getComponent()).thenReturn(COORDINATION_METADATA); RemoteClusterStateService mockService = spy(remoteClusterStateService); mockService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true); + + assertNotNull(remoteClusterStateService.getDownloadStats()); + assertEquals(1, remoteClusterStateService.getDownloadStats().getSuccessCount()); + assertEquals(0, remoteClusterStateService.getDownloadStats().getFailedCount()); verify(mockService, times(1)).readClusterStateInParallel( any(), eq(manifest), @@ -2568,7 +2572,7 @@ public void testGetValidPreviousClusterUUIDWhenLastUUIDUncommitted() throws IOEx assertThat(previousClusterUUID, equalTo("cluster-uuid2")); } - public void testRemoteStateStats() throws IOException { + public void testRemoteStateUploadStats() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); remoteClusterStateService.start(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 1bcc5b03e1eb5..b432e5411404e 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1026,7 +1026,7 @@ public void setLastAcceptedState(ClusterState clusterState) { } @Override - public PersistedStateStats getUploadStats() { + public PersistedStateStats getStats() { return null; }