Skip to content

Commit

Permalink
Implementing pagination for _cat/shards (#14641)
Browse files Browse the repository at this point in the history
* Adding _list/shards API

Signed-off-by: Harsh Garg <gkharsh@amazon.com>
  • Loading branch information
gargharsh3134 authored and Harsh Garg committed Oct 21, 2024
1 parent 96fdbfd commit fb7250c
Show file tree
Hide file tree
Showing 34 changed files with 1,691 additions and 186 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986))
- Flat object field use IndexOrDocValuesQuery to optimize query ([#14383](https://github.com/opensearch-project/OpenSearch/issues/14383))
- Add method to return dynamic SecureTransportParameters from SecureTransportSettingsProvider interface ([#16387](https://github.com/opensearch-project/OpenSearch/pull/16387)
- Add _list/shards API as paginated alternate to _cat/shards ([#14641](https://github.com/opensearch-project/OpenSearch/pull/14641))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.23.1 to 2.24.0 ([#15858](https://github.com/opensearch-project/OpenSearch/pull/15858))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -51,9 +50,9 @@ public void testCatShardsWithSuccessResponse() throws InterruptedException {
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
@Override
public void onResponse(CatShardsResponse catShardsResponse) {
ClusterStateResponse clusterStateResponse = catShardsResponse.getClusterStateResponse();
List<ShardRouting> shardRoutings = catShardsResponse.getResponseShards();
IndicesStatsResponse indicesStatsResponse = catShardsResponse.getIndicesStatsResponse();
for (ShardRouting shard : clusterStateResponse.getState().routingTable().allShards()) {
for (ShardRouting shard : shardRoutings) {
assertEquals("test", shard.getIndexName());
assertNotNull(indicesStatsResponse.asMap().get(shard));
}
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@
import org.opensearch.rest.action.list.AbstractListAction;
import org.opensearch.rest.action.list.RestIndicesListAction;
import org.opensearch.rest.action.list.RestListAction;
import org.opensearch.rest.action.list.RestShardsListAction;
import org.opensearch.rest.action.search.RestClearScrollAction;
import org.opensearch.rest.action.search.RestCountAction;
import org.opensearch.rest.action.search.RestCreatePitAction;
Expand Down Expand Up @@ -979,6 +980,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {

// LIST API
registerHandler.accept(new RestIndicesListAction(responseLimitSettings));
registerHandler.accept(new RestShardsListAction());

// Point in time API
registerHandler.accept(new RestCreatePitAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.pagination.PageParams;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.rest.action.admin.cluster.ClusterAdminTask;

Expand All @@ -27,13 +30,39 @@ public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsReq

private String[] indices;
private TimeValue cancelAfterTimeInterval;
private PageParams pageParams = null;
private boolean requestLimitCheckSupported;

public CatShardsRequest() {}

public CatShardsRequest(StreamInput in) throws IOException {
super(in);
this.requestLimitCheckSupported = false;
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
indices = in.readStringArray();
cancelAfterTimeInterval = in.readOptionalTimeValue();
if (in.readBoolean()) {
pageParams = new PageParams(in);
}
requestLimitCheckSupported = in.readBoolean();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
if (indices == null) {
out.writeVInt(0);
} else {
out.writeStringArray(indices);
}
out.writeOptionalTimeValue(cancelAfterTimeInterval);
out.writeBoolean(pageParams != null);
if (pageParams != null) {
pageParams.writeTo(out);
}
out.writeBoolean(requestLimitCheckSupported);
}
}

@Override
Expand All @@ -57,6 +86,14 @@ public TimeValue getCancelAfterTimeInterval() {
return this.cancelAfterTimeInterval;
}

public void setPageParams(PageParams pageParams) {
this.pageParams = pageParams;
}

public PageParams getPageParams() {
return pageParams;
}

public void setRequestLimitCheckSupported(final boolean requestLimitCheckSupported) {
this.requestLimitCheckSupported = requestLimitCheckSupported;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.Version;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.pagination.PageToken;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A response of a cat shards request.
Expand All @@ -23,28 +28,44 @@
*/
public class CatShardsResponse extends ActionResponse {

private ClusterStateResponse clusterStateResponse = null;

private IndicesStatsResponse indicesStatsResponse = null;
private IndicesStatsResponse indicesStatsResponse;
private DiscoveryNodes nodes = DiscoveryNodes.EMPTY_NODES;
private List<ShardRouting> responseShards = new ArrayList<>();
private PageToken pageToken;

public CatShardsResponse() {}

public CatShardsResponse(StreamInput in) throws IOException {
super(in);
indicesStatsResponse = new IndicesStatsResponse(in);
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
nodes = DiscoveryNodes.readFrom(in, null);
responseShards = in.readList(ShardRouting::new);
if (in.readBoolean()) {
pageToken = new PageToken(in);
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterStateResponse.writeTo(out);
indicesStatsResponse.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
nodes.writeToWithAttribute(out);
out.writeList(responseShards);
out.writeBoolean(pageToken != null);
if (pageToken != null) {
pageToken.writeTo(out);
}
}
}

public void setClusterStateResponse(ClusterStateResponse clusterStateResponse) {
this.clusterStateResponse = clusterStateResponse;
public void setNodes(DiscoveryNodes nodes) {
this.nodes = nodes;
}

public ClusterStateResponse getClusterStateResponse() {
return this.clusterStateResponse;
public DiscoveryNodes getNodes() {
return this.nodes;
}

public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) {
Expand All @@ -54,4 +75,20 @@ public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) {
public IndicesStatsResponse getIndicesStatsResponse() {
return this.indicesStatsResponse;
}

public void setResponseShards(List<ShardRouting> responseShards) {
this.responseShards = responseShards;
}

Check warning on line 81 in server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsResponse.java#L80-L81

Added lines #L80 - L81 were not covered by tests

public List<ShardRouting> getResponseShards() {
return this.responseShards;
}

public void setPageToken(PageToken pageToken) {
this.pageToken = pageToken;
}

public PageToken getPageToken() {
return this.pageToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.pagination.PageParams;
import org.opensearch.action.pagination.ShardPaginationStrategy;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.TimeoutTaskCancellationUtility;
Expand Down Expand Up @@ -57,7 +59,11 @@ public void doExecute(Task parentTask, CatShardsRequest shardsRequest, ActionLis
clusterStateRequest.setShouldCancelOnTimeout(true);
clusterStateRequest.local(shardsRequest.local());
clusterStateRequest.clusterManagerNodeTimeout(shardsRequest.clusterManagerNodeTimeout());
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices());
if (Objects.isNull(shardsRequest.getPageParams())) {
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices());

Check warning on line 63 in server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java#L63

Added line #L63 was not covered by tests
} else {
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices()).metadata(true);

Check warning on line 65 in server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java#L65

Added line #L65 was not covered by tests
}
assert parentTask instanceof CancellableTask;
clusterStateRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());

Expand Down Expand Up @@ -87,13 +93,26 @@ protected void innerOnFailure(Exception e) {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
validateRequestLimit(shardsRequest, clusterStateResponse, cancellableListener);
catShardsResponse.setClusterStateResponse(clusterStateResponse);
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
indicesStatsRequest.all();
indicesStatsRequest.indices(shardsRequest.getIndices());
indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());
try {
ShardPaginationStrategy paginationStrategy = getPaginationStrategy(
shardsRequest.getPageParams(),

Check warning on line 98 in server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java#L97-L98

Added lines #L97 - L98 were not covered by tests
clusterStateResponse
);
String[] indices = Objects.isNull(paginationStrategy)
? shardsRequest.getIndices()
: paginationStrategy.getRequestedIndices().toArray(new String[0]);
catShardsResponse.setNodes(clusterStateResponse.getState().getNodes());
catShardsResponse.setResponseShards(

Check warning on line 105 in server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java#L102-L105

Added lines #L102 - L105 were not covered by tests
Objects.isNull(paginationStrategy)
? clusterStateResponse.getState().routingTable().allShards()
: paginationStrategy.getRequestedEntities()

Check warning on line 108 in server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java#L107-L108

Added lines #L107 - L108 were not covered by tests
);
catShardsResponse.setPageToken(Objects.isNull(paginationStrategy) ? null : paginationStrategy.getResponseToken());
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
indicesStatsRequest.all();
indicesStatsRequest.indices(indices);
indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());

Check warning on line 115 in server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java#L111-L115

Added lines #L111 - L115 were not covered by tests
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
Expand Down Expand Up @@ -122,6 +141,10 @@ public void onFailure(Exception e) {

}

private ShardPaginationStrategy getPaginationStrategy(PageParams pageParams, ClusterStateResponse clusterStateResponse) {
return Objects.isNull(pageParams) ? null : new ShardPaginationStrategy(pageParams, clusterStateResponse.getState());
}

private void validateRequestLimit(
final CatShardsRequest shardsRequest,
final ClusterStateResponse clusterStateResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class IndicesStatsResponse extends BroadcastResponse {

private Map<ShardRouting, ShardStats> shardStatsMap;

IndicesStatsResponse(StreamInput in) throws IOException {
public IndicesStatsResponse(StreamInput in) throws IOException {
super(in);
shards = in.readArray(ShardStats::new, (size) -> new ShardStats[size]);
}
Expand Down
Loading

0 comments on commit fb7250c

Please sign in to comment.