Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing pagination for _cat/indices API #14718

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions server/src/main/java/org/opensearch/common/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import reactor.util.annotation.NonNull;

import static java.util.Collections.emptyMap;

/**
Expand All @@ -59,9 +61,18 @@ public class Table {
private List<Cell> currentCells;
private boolean inHeaders = false;
private boolean withTime = false;
private PaginationMetadata paginationMetadata = new PaginationMetadata(false, null, null, null);
public static final String EPOCH = "epoch";
public static final String TIMESTAMP = "timestamp";

public Table() {}

public Table(@Nullable PaginationMetadata paginationMetadata) {
if (paginationMetadata != null) {
this.paginationMetadata = paginationMetadata;
}
}

public Table startHeaders() {
inHeaders = true;
currentCells = new ArrayList<>();
Expand Down Expand Up @@ -230,6 +241,22 @@ public Map<String, String> getAliasMap() {
return headerAliasMap;
}

public boolean isPaginated() {
return paginationMetadata.isResponsePaginated;
}

public String getPaginatedElement() {
return paginationMetadata.paginatedElement;
}

public String getNextToken() {
return paginationMetadata.nextToken;
}

public String getPreviousToken() {
return paginationMetadata.previousToken;
}

/**
* Cell in a table
*
Expand All @@ -254,4 +281,45 @@ public Cell(Object value, Map<String, String> attr) {
this.attr = attr;
}
}

/**
* Pagination metadata for a table.
*
* @opensearch.internal
*/
public static class PaginationMetadata {

/**
* boolean denoting whether the table is paginated or not.
*/
public final boolean isResponsePaginated;

/**
* String denoting the element which is being paginated (for e.g. shards, indices..).
*/
public final String paginatedElement;

/**
* String denoting the next_token of paginated response, which will be used to fetch next page (if any).
*/
public final String nextToken;

/**
* String denoting the previous_token of paginated response, which will be used to fetch previous page (if any).
*/
public final String previousToken;

public PaginationMetadata(
@NonNull boolean isResponsePaginated,
@Nullable String paginatedElement,
@Nullable String nextToken,
@Nullable String previousToken
) {
this.isResponsePaginated = isResponsePaginated;
assert !isResponsePaginated || paginatedElement != null : "paginatedElement must be specified for a table which is paginated";
this.paginatedElement = paginatedElement;
this.nextToken = nextToken;
this.previousToken = previousToken;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.RestResponseListener;
import org.opensearch.rest.pagination.IndexBasedPaginationStrategy;

import java.time.Instant;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -95,10 +96,18 @@ public class RestIndicesAction extends AbstractCatAction {
"Parameter [master_timeout] is deprecated and will be removed in 3.0. To support inclusive language, please use [cluster_manager_timeout] instead.";
private static final String DUPLICATE_PARAMETER_ERROR_MESSAGE =
"Please only use one of the request parameters [master_timeout, cluster_manager_timeout].";
private static final String DEFAULT_CAT_INDICES_PAGE_SIZE_STRING = "1000";

@Override
public List<Route> routes() {
return unmodifiableList(asList(new Route(GET, "/_cat/indices"), new Route(GET, "/_cat/indices/{index}")));
return unmodifiableList(
asList(
new Route(GET, "/_cat/indices"),
new Route(GET, "/_cat/indices/{index}"),
new Route(GET, "/_cat/V2/indices"),
new Route(GET, "/_cat/V2/indices/{index}")
)
);
}

@Override
Expand Down Expand Up @@ -131,9 +140,13 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
}
clusterManagerTimeout = request.paramAsTime("master_timeout", DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT);
}
// Check for a paginated query.
if (request.path().contains("/V2/indices")) {
return doPaginatedCatRequest(request, client, clusterManagerTimeout, indices);
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
}

final TimeValue clusterManagerNodeTimeout = clusterManagerTimeout;
final boolean includeUnloadedSegments = request.paramAsBoolean("include_unloaded_segments", false);

return channel -> {
final ActionListener<Table> listener = ActionListener.notifyOnce(new RestResponseListener<Table>(channel) {
@Override
Expand Down Expand Up @@ -205,6 +218,91 @@ public void onFailure(final Exception e) {
};
}

public RestChannelConsumer doPaginatedCatRequest(
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
final RestRequest request,
final NodeClient client,
final TimeValue clusterManagerNodeTimeout,
final String[] indices
) {
final boolean local = request.paramAsBoolean("local", false);
final boolean includeUnloadedSegments = request.paramAsBoolean("include_unloaded_segments", false);
final String requestedToken = request.param("next_token");
IndexBasedPaginationStrategy.validateRequestedRequest(requestedToken);
final int pageSize = Integer.parseInt(request.param("max_page_size", DEFAULT_CAT_INDICES_PAGE_SIZE_STRING));
final boolean latestIndicesFirst = request.paramAsBoolean("latest_indices_first", false);

return channel -> {
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
final ActionListener<Table> listener = ActionListener.notifyOnce(new RestResponseListener<Table>(channel) {
@Override
public RestResponse buildResponse(final Table table) throws Exception {
return RestTable.buildResponse(table, channel);
}
});

// Fetch all the indices from clusterStateRequest for a paginated query.
sendClusterStateRequest(
indices,
IndicesOptions.lenientExpandHidden(),
local,
clusterManagerNodeTimeout,
client,
new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(final ClusterStateResponse clusterStateResponse) {
IndexBasedPaginationStrategy paginationStrategy = new IndexBasedPaginationStrategy(
requestedToken,
pageSize,
latestIndicesFirst,
clusterStateResponse.getState()
);

final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(
request,
4,
listener,
new Table.PaginationMetadata(
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
true,
"indices",
paginationStrategy.getNextToken(),
paginationStrategy.getPreviousToken()
)
);
groupedListener.onResponse(clusterStateResponse);
final String[] indicesToBeQueried = paginationStrategy.getPageElements().toArray(new String[0]);
sendGetSettingsRequest(
indicesToBeQueried,
IndicesOptions.fromRequest(request, IndicesOptions.strictExpand()),
local,
clusterManagerNodeTimeout,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
sendIndicesStatsRequest(
indicesToBeQueried,
IndicesOptions.lenientExpandHidden(),
includeUnloadedSegments,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
sendClusterHealthRequest(
indicesToBeQueried,
IndicesOptions.lenientExpandHidden(),
local,
clusterManagerNodeTimeout,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
}
);
};
}

/**
* We're using the Get Settings API here to resolve the authorized indices for the user.
* This is because the Cluster State and Cluster Health APIs do not filter output based
Expand Down Expand Up @@ -288,6 +386,15 @@ private GroupedActionListener<ActionResponse> createGroupedListener(
final RestRequest request,
final int size,
final ActionListener<Table> listener
) {
return createGroupedListener(request, size, listener, null);
}

private GroupedActionListener<ActionResponse> createGroupedListener(
final RestRequest request,
final int size,
final ActionListener<Table> listener,
final Table.PaginationMetadata paginationMetadata
) {
return new GroupedActionListener<>(new ActionListener<Collection<ActionResponse>>() {
@Override
Expand All @@ -311,7 +418,14 @@ public void onResponse(final Collection<ActionResponse> responses) {
IndicesStatsResponse statsResponse = extractResponse(responses, IndicesStatsResponse.class);
Map<String, IndexStats> indicesStats = statsResponse.getIndices();

Table responseTable = buildTable(request, indicesSettings, indicesHealths, indicesStats, indicesStates);
Table responseTable = buildTable(
request,
indicesSettings,
indicesHealths,
indicesStats,
indicesStates,
paginationMetadata
);
listener.onResponse(responseTable);
} catch (Exception e) {
onFailure(e);
Expand Down Expand Up @@ -340,7 +454,11 @@ protected Set<String> responseParams() {

@Override
protected Table getTableWithHeader(final RestRequest request) {
Table table = new Table();
return getTableWithHeader(request, null);
}

protected Table getTableWithHeader(final RestRequest request, final Table.PaginationMetadata paginationMetadata) {
Table table = new Table(paginationMetadata);
table.startHeaders();
table.addCell("health", "alias:h;desc:current health status");
table.addCell("status", "alias:s;desc:open/close status");
Expand Down Expand Up @@ -709,11 +827,12 @@ Table buildTable(
final Map<String, Settings> indicesSettings,
final Map<String, ClusterIndexHealth> indicesHealths,
final Map<String, IndexStats> indicesStats,
final Map<String, IndexMetadata> indicesMetadatas
final Map<String, IndexMetadata> indicesMetadatas,
final Table.PaginationMetadata paginationMetadata
) {

final String healthParam = request.param("health");
final Table table = getTableWithHeader(request);
final Table table = getTableWithHeader(request, paginationMetadata);

indicesSettings.forEach((indexName, settings) -> {
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
if (indicesMetadatas.containsKey(indexName) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,15 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel
XContentBuilder builder = channel.newBuilder();
List<DisplayHeader> displayHeaders = buildDisplayHeaders(table, request);

builder.startArray();
if (table.isPaginated()) {
assert table.getPaginatedElement() != null : "Paginated element is required in-case nextToken is not null";
builder.startObject();
builder.field("previous_token", table.getPreviousToken());
builder.field("next_token", table.getNextToken());
builder.startArray(table.getPaginatedElement());
} else {
builder.startArray();
}
List<Integer> rowOrder = getRowOrder(table, request);
gargharsh3134 marked this conversation as resolved.
Show resolved Hide resolved
for (Integer row : rowOrder) {
builder.startObject();
Expand All @@ -98,6 +106,9 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel
builder.endObject();
}
builder.endArray();
if (table.isPaginated()) {
builder.endObject();
}
return new BytesRestResponse(RestStatus.OK, builder);
}

Expand Down Expand Up @@ -136,6 +147,13 @@ public static RestResponse buildTextPlainResponse(Table table, RestChannel chann
}
out.append("\n");
}
// Adding a nextToken row, post an empty line, in the response if the table is paginated.
if (table.isPaginated()) {
out.append("previous_token" + " " + table.getPreviousToken());
out.append("\n");
out.append("next_token" + " " + table.getNextToken());
out.append("\n");
}
out.close();
return new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, bytesOut.bytes());
}
Expand Down
Loading
Loading