Skip to content

Commit

Permalink
Add UTs and address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
  • Loading branch information
Arpit-Bandejiya committed Aug 28, 2024
1 parent cc0871e commit b8f74ee
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ public interface PersistedState extends Closeable {
* Returns the stats for the persistence layer for {@link CoordinationState}.
* @return PersistedStateStats
*/
PersistedStateStats getUploadStats();
PersistedStateStats getStats();

/**
* Marks the last accepted cluster state as committed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,10 +896,9 @@ public DiscoveryStats stats() {
ClusterStateStats clusterStateStats = clusterManagerService.getClusterStateStats();
ArrayList<PersistedStateStats> stats = new ArrayList<>();
Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> {
if (persistedStateRegistry.getPersistedState(stateType) != null) {
if (persistedStateRegistry.getPersistedState(stateType).getUploadStats() != null) {
stats.add(persistedStateRegistry.getPersistedState(stateType).getUploadStats());
}
if (persistedStateRegistry.getPersistedState(stateType) != null
&& persistedStateRegistry.getPersistedState(stateType).getStats() != null) {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
if (coordinationState.get().isRemotePublicationEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getUploadStats() {
public PersistedStateStats getStats() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}

// package private for testing
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, RuntimeException {
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException {
try {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
Expand All @@ -260,7 +260,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
}

if (applyFullState == true) {
remoteClusterStateService.fullDownloadState();
logger.debug(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
Expand All @@ -278,9 +277,9 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
remoteClusterStateService.fullDownloadState();
return response;
} else {
remoteClusterStateService.diffDownloadState();
logger.debug(
() -> new ParameterizedMessage(
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
Expand All @@ -298,12 +297,12 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
remoteClusterStateService.diffDownloadState();
return response;
}
} catch (Exception e) {
remoteClusterStateService.readMetadataFailed();
if (e instanceof IOException) throw new IOException("IOException in reading remote cluster state", e);
throw new RuntimeException("Runtime exception in reading remote cluster state", e);
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getUploadStats() {
public PersistedStateStats getStats() {
// Note: These stats are not published yet, will come in future
return null;
}
Expand Down Expand Up @@ -740,7 +740,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(
}

@Override
public PersistedStateStats getUploadStats() {
public PersistedStateStats getStats() {
return remoteClusterStateService.getUploadStats();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@

import java.util.concurrent.atomic.AtomicLong;

/**
* Download stats for remote publication
*
* @opensearch.internal
*/
public class RemoteDownloadStats extends PersistedStateStats {
static final String REMOTE_DOWNLOAD = "remote_download";
static final String DIFF_DOWNLOAD = "diff_download";
Expand All @@ -33,4 +38,11 @@ public void fullDownloadState() {
fullDownloadCount.incrementAndGet();
}

public long getDiffDownloadCount() {
return diffDownloadCount.get();
}

public long getFullDownloadCount() {
return fullDownloadCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@

import java.util.concurrent.atomic.AtomicLong;

/**
* Upload stats for remote state
*
* @opensearch.internal
*/
public class RemoteUploadStats extends PersistedStateStats {
static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count";
static final String INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "index_routing_files_cleanup_attempt_failed_count";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.ClusterStateDiffManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteDownloadStats;
import org.opensearch.node.Node;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
Expand All @@ -62,8 +64,12 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class PublicationTransportHandlerTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -160,7 +166,8 @@ public void testHandleIncomingRemotePublishRequestWhenNoCurrentPublishRequest()
() -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)
);
assertThat(e.getMessage(), containsString("publication to self failed"));
Mockito.verifyNoInteractions(remoteClusterStateService);
verify(remoteClusterStateService, times(1)).readMetadataFailed();
verifyNoMoreInteractions(remoteClusterStateService);
}

public void testHandleIncomingRemotePublishRequestWhenTermMismatch() {
Expand All @@ -185,7 +192,8 @@ public void testHandleIncomingRemotePublishRequestWhenTermMismatch() {
() -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)
);
assertThat(e.getMessage(), containsString("publication to self failed"));
Mockito.verifyNoInteractions(remoteClusterStateService);
verify(remoteClusterStateService, times(1)).readMetadataFailed();
verifyNoMoreInteractions(remoteClusterStateService);
}

public void testHandleIncomingRemotePublishRequestWhenVersionMismatch() {
Expand All @@ -210,7 +218,8 @@ public void testHandleIncomingRemotePublishRequestWhenVersionMismatch() {
() -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)
);
assertThat(e.getMessage(), containsString("publication to self failed"));
Mockito.verifyNoInteractions(remoteClusterStateService);
verify(remoteClusterStateService, times(1)).readMetadataFailed();
verifyNoMoreInteractions(remoteClusterStateService);
}

public void testHandleIncomingRemotePublishRequestForLocalNode() throws IOException {
Expand All @@ -235,6 +244,119 @@ public void testHandleIncomingRemotePublishRequestForLocalNode() throws IOExcept
Mockito.verifyNoInteractions(remoteClusterStateService);
}

public void testDownloadRemotePersistedFailedStats() throws IOException {
RemoteDownloadStats remoteDownloadStats = new RemoteDownloadStats();
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
when(remoteClusterStateService.getDownloadStats()).thenReturn(remoteDownloadStats);

doAnswer((i) -> {
remoteDownloadStats.stateFailed();
return null;
}).when(remoteClusterStateService).readMetadataFailed();

PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);
RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
secondNode,
TERM,
VERSION,
CLUSTER_NAME,
CLUSTER_UUID,
MANIFEST_FILE
);
ClusterState clusterState = buildClusterState(TERM, VERSION);
PublishRequest publishRequest = new PublishRequest(clusterState);
handler.setCurrentPublishRequestToSelf(publishRequest);

assertThrows(IllegalStateException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest));
assertEquals(1, remoteClusterStateService.getDownloadStats().getFailedCount());
}

public void testDownloadRemotePersistedDiffStats() throws IOException {
RemoteDownloadStats remoteDownloadStats = new RemoteDownloadStats();
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
when(remoteClusterStateService.getDownloadStats()).thenReturn(remoteDownloadStats);
ClusterMetadataManifest metadataManifest = new ClusterMetadataManifest.Builder().diffManifest(
new ClusterStateDiffManifest.Builder().fromStateUUID("state-uuid").build()
).build();
when(remoteClusterStateService.getClusterMetadataManifestByFileName(any(), any())).thenReturn(metadataManifest);

doAnswer((i) -> {
remoteDownloadStats.diffDownloadState();
return null;
}).when(remoteClusterStateService).diffDownloadState();

doAnswer((i) -> {
remoteDownloadStats.fullDownloadState();
return null;
}).when(remoteClusterStateService).fullDownloadState();

PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);
ClusterState clusterState = mock(ClusterState.class);
handler.setLastSeenClusterState(clusterState);
when(clusterState.stateUUID()).thenReturn("state-uuid");

RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
secondNode,
TERM,
VERSION,
CLUSTER_NAME,
CLUSTER_UUID,
MANIFEST_FILE
);
clusterState = buildClusterState(TERM, VERSION);
PublishRequest publishRequest = new PublishRequest(clusterState);
handler.setCurrentPublishRequestToSelf(publishRequest);
assertThrows(NullPointerException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest));
assertEquals(1, remoteDownloadStats.getDiffDownloadCount());
assertEquals(0, remoteDownloadStats.getFullDownloadCount());
}

public void testDownloadRemotePersistedFullStats() throws IOException {
RemoteDownloadStats remoteDownloadStats = new RemoteDownloadStats();
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
when(remoteClusterStateService.getDownloadStats()).thenReturn(remoteDownloadStats);
ClusterMetadataManifest metadataManifest = new ClusterMetadataManifest.Builder().diffManifest(
new ClusterStateDiffManifest.Builder().fromStateUUID("state-uuid2").build()
).build();
when(remoteClusterStateService.getClusterMetadataManifestByFileName(any(), any())).thenReturn(metadataManifest);

doAnswer((i) -> {
remoteDownloadStats.diffDownloadState();
return null;
}).when(remoteClusterStateService).diffDownloadState();

doAnswer((i) -> {
remoteDownloadStats.fullDownloadState();
return null;
}).when(remoteClusterStateService).fullDownloadState();

PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);
ClusterState clusterState = mock(ClusterState.class);
handler.setLastSeenClusterState(clusterState);
when(clusterState.stateUUID()).thenReturn("state-uuid");

RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
secondNode,
TERM,
VERSION,
CLUSTER_NAME,
CLUSTER_UUID,
MANIFEST_FILE
);
clusterState = buildClusterState(TERM, VERSION);
PublishRequest publishRequest = new PublishRequest(clusterState);
handler.setCurrentPublishRequestToSelf(publishRequest);
assertThrows(NullPointerException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest));
assertEquals(0, remoteDownloadStats.getDiffDownloadCount());
assertEquals(1, remoteDownloadStats.getFullDownloadCount());
}

public void testHandleIncomingRemotePublishRequestWhenManifestNotFound() throws IOException {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -837,7 +837,10 @@ public void testRemotePersistedStateFailureStats() throws IOException {
final String previousClusterUUID = "prev-cluster-uuid";
Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any(), Mockito.any());
when(remoteClusterStateService.getUploadStats()).thenReturn(remoteStateStats);
doCallRealMethod().when(remoteClusterStateService).writeMetadataFailed();
doAnswer((i) -> {
remoteStateStats.stateFailed();
return null;
}).when(remoteClusterStateService).writeMetadataFailed();
CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID);

final long clusterTerm = randomNonNegativeLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,10 @@ public void testGetClusterStateForManifest_IncludeEphemeral() throws IOException
when(mockedResult.getComponent()).thenReturn(COORDINATION_METADATA);
RemoteClusterStateService mockService = spy(remoteClusterStateService);
mockService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true);

assertNotNull(remoteClusterStateService.getDownloadStats());
assertEquals(1, remoteClusterStateService.getDownloadStats().getSuccessCount());
assertEquals(0, remoteClusterStateService.getDownloadStats().getFailedCount());
verify(mockService, times(1)).readClusterStateInParallel(
any(),
eq(manifest),
Expand Down Expand Up @@ -2568,7 +2572,7 @@ public void testGetValidPreviousClusterUUIDWhenLastUUIDUncommitted() throws IOEx
assertThat(previousClusterUUID, equalTo("cluster-uuid2"));
}

public void testRemoteStateStats() throws IOException {
public void testRemoteStateUploadStats() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
mockBlobStoreObjects();
remoteClusterStateService.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getUploadStats() {
public PersistedStateStats getStats() {
return null;
}

Expand Down

0 comments on commit b8f74ee

Please sign in to comment.