Skip to content

Commit

Permalink
Refactoring strategy to simplify generating list of requested indices
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <gkharsh@amazon.com>
  • Loading branch information
Harsh Garg committed Sep 19, 2024
1 parent 0621975 commit 3bedc82
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class RestIndicesListAction extends RestIndicesAction {

protected static final int MAX_SUPPORTED_LIST_INDICES_PAGE_SIZE_STRING = 1000;
protected static final int DEFAULT_LIST_INDICES_PAGE_SIZE_STRING = 1000;
protected static final String PAGINATED_LIST_INDICES_ELEMENT_KEY = "indices";

@Override
public List<Route> routes() {
Expand Down Expand Up @@ -108,7 +107,7 @@ protected Table buildTable(

@Override
protected IndexBasedPaginationStrategy getPaginationStrategy(ClusterStateResponse clusterStateResponse) {
return new IndexBasedPaginationStrategy(paginatedQueryRequest, PAGINATED_LIST_INDICES_ELEMENT_KEY, clusterStateResponse.getState());
return new IndexBasedPaginationStrategy(paginatedQueryRequest, clusterStateResponse.getState());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
import org.opensearch.common.Nullable;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.opensearch.rest.pagination.PaginatedQueryRequest.PAGINATED_QUERY_ASCENDING_SORT;

/**
* This strategy can be used by the Rest APIs wanting to paginate the responses based on Indices.
* The strategy considers create timestamps of indices as the keys to iterate over pages.
Expand All @@ -29,102 +33,79 @@ public class IndexBasedPaginationStrategy implements PaginationStrategy<String>
private final PaginatedQueryResponse paginatedQueryResponse;
private final List<String> indicesFromRequestedToken;

public IndexBasedPaginationStrategy(PaginatedQueryRequest paginatedQueryRequest, String paginatedElement, ClusterState clusterState) {
// Get list of indices metadata sorted by their creation time
List<IndexMetadata> sortedIndicesList = PaginationStrategy.getListOfIndicesSortedByCreateTime(
private static final String DEFAULT_INDICES_PAGINATED_ELEMENT = "indices";

public IndexBasedPaginationStrategy(PaginatedQueryRequest paginatedQueryRequest, ClusterState clusterState) {
// Get list of indices metadata sorted by their creation time and filtered by the last send index
List<IndexMetadata> sortedIndicesList = PaginationStrategy.getSortedIndexMetadata(
clusterState,
paginatedQueryRequest.getSort()
getMetadataListFilter(paginatedQueryRequest.getRequestedTokenStr(), paginatedQueryRequest.getSort()),
getMetadataListComparator(paginatedQueryRequest.getSort())
);
this.indicesFromRequestedToken = getIndicesFromRequestedToken(sortedIndicesList, paginatedQueryRequest);
this.paginatedQueryResponse = getPaginatedResponseFromRequestedToken(sortedIndicesList, paginatedQueryRequest, paginatedElement);
List<IndexMetadata> metadataListForRequestedToken = getMetadataListForRequestedToken(sortedIndicesList, paginatedQueryRequest);
this.indicesFromRequestedToken = metadataListForRequestedToken.stream()
.map(metadata -> metadata.getIndex().getName())
.collect(Collectors.toList());
this.paginatedQueryResponse = getPaginatedResponseForRequestedToken(paginatedQueryRequest.getSize(), sortedIndicesList);
}

private List<String> getIndicesFromRequestedToken(List<IndexMetadata> sortedIndicesList, PaginatedQueryRequest paginatedQueryRequest) {
if (sortedIndicesList.isEmpty()) {
return new ArrayList<>();
private static Predicate<IndexMetadata> getMetadataListFilter(String requestedTokenStr, String sortOrder) {
boolean isAscendingSort = sortOrder.equals(PAGINATED_QUERY_ASCENDING_SORT);
IndexStrategyToken requestedToken = Objects.isNull(requestedTokenStr) || requestedTokenStr.isEmpty()
? null
: new IndexStrategyToken(requestedTokenStr);
if (Objects.isNull(requestedToken)) {
return indexMetadata -> true;
}
final int requestedPageStartIndexNumber = getRequestedPageIndexStartNumber(paginatedQueryRequest, sortedIndicesList); // inclusive
int requestedPageEndIndexNumber = Math.min(
requestedPageStartIndexNumber + paginatedQueryRequest.getSize(),
sortedIndicesList.size()
); // exclusive
return sortedIndicesList.subList(requestedPageStartIndexNumber, requestedPageEndIndexNumber)
.stream()
.map(indexMetadata -> indexMetadata.getIndex().getName())
.collect(Collectors.toList());
return indexMetadata -> {
if (indexMetadata.getIndex().getName().equals(requestedToken.nameOfLastRespondedIndex)) {
return false;
} else if (indexMetadata.getCreationDate() == requestedToken.creationTimeOfLastRespondedIndex) {
return isAscendingSort
? indexMetadata.getIndex().getName().compareTo(requestedToken.nameOfLastRespondedIndex) > 0
: indexMetadata.getIndex().getName().compareTo(requestedToken.nameOfLastRespondedIndex) < 0;
}
return isAscendingSort
? indexMetadata.getCreationDate() > requestedToken.creationTimeOfLastRespondedIndex
: indexMetadata.getCreationDate() < requestedToken.creationTimeOfLastRespondedIndex;
};
}

private static Comparator<IndexMetadata> getMetadataListComparator(String sortOrder) {
boolean isAscendingSort = sortOrder.equals(PAGINATED_QUERY_ASCENDING_SORT);
return (metadata1, metadata2) -> {
if (metadata1.getCreationDate() == metadata2.getCreationDate()) {
return isAscendingSort
? metadata1.getIndex().getName().compareTo(metadata2.getIndex().getName())
: metadata2.getIndex().getName().compareTo(metadata1.getIndex().getName());
}
return isAscendingSort
? Long.compare(metadata1.getCreationDate(), metadata2.getCreationDate())
: Long.compare(metadata2.getCreationDate(), metadata1.getCreationDate());
};
}

private PaginatedQueryResponse getPaginatedResponseFromRequestedToken(
private List<IndexMetadata> getMetadataListForRequestedToken(
List<IndexMetadata> sortedIndicesList,
PaginatedQueryRequest paginatedQueryRequest,
String paginatedElement
PaginatedQueryRequest paginatedQueryRequest
) {
if (sortedIndicesList.isEmpty()) {
return new PaginatedQueryResponse(null, paginatedElement);
return new ArrayList<>();
}
int positionToStartNextPage = Math.min(
getRequestedPageIndexStartNumber(paginatedQueryRequest, sortedIndicesList) + paginatedQueryRequest.getSize(),
sortedIndicesList.size()
);
return new PaginatedQueryResponse(
positionToStartNextPage >= sortedIndicesList.size()
? null
: new IndexStrategyToken(
positionToStartNextPage,
sortedIndicesList.get(positionToStartNextPage - 1).getCreationDate(),
sortedIndicesList.get(positionToStartNextPage - 1).getIndex().getName()
).generateEncryptedToken(),
paginatedElement
);
return sortedIndicesList.subList(0, Math.min(paginatedQueryRequest.getSize(), sortedIndicesList.size()));
}

private int getRequestedPageIndexStartNumber(PaginatedQueryRequest paginatedQueryRequest, List<IndexMetadata> sortedIndicesList) {
if (Objects.isNull(paginatedQueryRequest.getRequestedTokenStr())) {
// first paginated query, start from first index.
return 0;
}

IndexStrategyToken requestedToken = new IndexStrategyToken(paginatedQueryRequest.getRequestedTokenStr());
// If the already requested indices have been deleted, the position to start in the last token could be
// greater than the sorted list's size, hence limiting it to current list's size.
int requestedPageStartIndexNumber = Math.min(requestedToken.posToStartPage, sortedIndicesList.size());
IndexMetadata currentIndexAtLastSentPosition = sortedIndicesList.get(requestedPageStartIndexNumber - 1);

if (!Objects.equals(currentIndexAtLastSentPosition.getIndex().getName(), requestedToken.nameOfLastRespondedIndex)) {
// case denoting already responded index/indices has/have been deleted/added in between the paginated queries.
// find the index whose creation time is just after/before (based on sortOrder) the index which was last responded.
if (!DESCENDING_SORT_PARAM_VALUE.equals(paginatedQueryRequest.getSort())) {
// For ascending sort order, if indices were deleted, the index to start current page could only have
// moved upwards (at a smaller position) in the sorted list. Traverse backwards to find such index
while (requestedPageStartIndexNumber > 0
&& (sortedIndicesList.get(requestedPageStartIndexNumber - 1)
.getCreationDate() > requestedToken.creationTimeOfLastRespondedIndex)) {
requestedPageStartIndexNumber--;
}
} else {
// For descending order, there could be following 2 possibilities:
// 1. Number of already responded indices which got deleted is greater than newly created ones.
// -> The index to start the page from, would have shifted up in the list. Traverse backwards to find it.
// 2. Number of indices which got created is greater than number of already responded indices which got deleted.
// -> The index to start the page from, would have shifted down in the list. Traverse forward to find it.
boolean traverseForward = currentIndexAtLastSentPosition
.getCreationDate() >= requestedToken.creationTimeOfLastRespondedIndex;
if (traverseForward) {
while (requestedPageStartIndexNumber < sortedIndicesList.size()
&& (sortedIndicesList.get(requestedPageStartIndexNumber - 1)
.getCreationDate() > requestedToken.creationTimeOfLastRespondedIndex)) {
requestedPageStartIndexNumber++;
}
} else {
while (requestedPageStartIndexNumber > 0
&& (sortedIndicesList.get(requestedPageStartIndexNumber - 1)
.getCreationDate() < requestedToken.creationTimeOfLastRespondedIndex)) {
requestedPageStartIndexNumber--;
}
}
}
private PaginatedQueryResponse getPaginatedResponseForRequestedToken(int pageSize, List<IndexMetadata> sortedIndicesList) {
if (sortedIndicesList.size() <= pageSize) {
return new PaginatedQueryResponse(null, DEFAULT_INDICES_PAGINATED_ELEMENT);
}
return requestedPageStartIndexNumber;
return new PaginatedQueryResponse(
new IndexStrategyToken(
sortedIndicesList.get(pageSize - 1).getCreationDate(),
sortedIndicesList.get(pageSize - 1).getIndex().getName()
).generateEncryptedToken(),
DEFAULT_INDICES_PAGINATED_ELEMENT
);
}

@Override
Expand All @@ -140,20 +121,15 @@ public List<String> getElementsFromRequestedToken() {

/**
* TokenParser to be used by {@link IndexBasedPaginationStrategy}.
* Token would like: IndexNumberToStartTheNextPageFrom + $ + CreationTimeOfLastRespondedIndex + $ +
* QueryStartTime + $ + NameOfLastRespondedIndex
* Token would like: IndexNumberToStartTheNextPageFrom + | + CreationTimeOfLastRespondedIndex + | +
* QueryStartTime + | + NameOfLastRespondedIndex
*/
public static class IndexStrategyToken {

private static final String TOKEN_JOIN_DELIMITER = "|";
private static final String TOKEN_SPLIT_REGEX = "\\|";
private static final int START_PAGE_FIELD_POSITION_IN_TOKEN = 0;
private static final int CREATION_TIME_FIELD_POSITION_IN_TOKEN = 1;

/**
* Denotes the position in the sorted list of indices to start building the page from.
*/
private final int posToStartPage;
private static final int CREATION_TIME_FIELD_POSITION_IN_TOKEN = 0;
private static final int INDEX_NAME_FIELD_POSITION_IN_TOKEN = 1;

/**
* Represents creation times of last index which was displayed in the previous page.
Expand All @@ -171,26 +147,19 @@ public IndexStrategyToken(String requestedTokenString) {
validateIndexStrategyToken(requestedTokenString);
String decryptedToken = PaginationStrategy.decryptStringToken(requestedTokenString);
final String[] decryptedTokenElements = decryptedToken.split(TOKEN_SPLIT_REGEX);
this.posToStartPage = Integer.parseInt(decryptedTokenElements[START_PAGE_FIELD_POSITION_IN_TOKEN]);
this.creationTimeOfLastRespondedIndex = Long.parseLong(decryptedTokenElements[CREATION_TIME_FIELD_POSITION_IN_TOKEN]);
this.nameOfLastRespondedIndex = decryptedTokenElements[2];
this.nameOfLastRespondedIndex = decryptedTokenElements[INDEX_NAME_FIELD_POSITION_IN_TOKEN];
}

public IndexStrategyToken(int indexNumberToStartPageFrom, long creationTimeOfLastRespondedIndex, String nameOfLastRespondedIndex) {
public IndexStrategyToken(long creationTimeOfLastRespondedIndex, String nameOfLastRespondedIndex) {
Objects.requireNonNull(nameOfLastRespondedIndex, "index name should be provided");
this.posToStartPage = indexNumberToStartPageFrom;
this.creationTimeOfLastRespondedIndex = creationTimeOfLastRespondedIndex;
this.nameOfLastRespondedIndex = nameOfLastRespondedIndex;
}

public String generateEncryptedToken() {
return PaginationStrategy.encryptStringToken(
String.join(
TOKEN_JOIN_DELIMITER,
String.valueOf(posToStartPage),
String.valueOf(creationTimeOfLastRespondedIndex),
nameOfLastRespondedIndex
)
String.join(TOKEN_JOIN_DELIMITER, String.valueOf(creationTimeOfLastRespondedIndex), nameOfLastRespondedIndex)
);
}

Expand All @@ -205,13 +174,12 @@ public static void validateIndexStrategyToken(String requestedTokenString) {
Objects.requireNonNull(requestedTokenString, "requestedTokenString can not be null");
String decryptedToken = PaginationStrategy.decryptStringToken(requestedTokenString);
final String[] decryptedTokenElements = decryptedToken.split(TOKEN_SPLIT_REGEX);
if (decryptedTokenElements.length != 3) {
if (decryptedTokenElements.length != 2) {
throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE);
}
try {
int posToStartPage = Integer.parseInt(decryptedTokenElements[0]);
long creationTimeOfLastRespondedIndex = Long.parseLong(decryptedTokenElements[1]);
if (posToStartPage <= 0 || creationTimeOfLastRespondedIndex <= 0) {
long creationTimeOfLastRespondedIndex = Long.parseLong(decryptedTokenElements[CREATION_TIME_FIELD_POSITION_IN_TOKEN]);
if (creationTimeOfLastRespondedIndex <= 0) {
throw new OpenSearchParseException(INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE);
}
} catch (NumberFormatException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@
*
* Class specific to paginated queries, which will contain common query params required by a paginated API.
*/
@PublicApi(since = "2.17.0")
@PublicApi(since = "3.0.0")
public class PaginatedQueryRequest {

public static final String PAGINATED_QUERY_PARAM_SORT_KEY = "sort";
public static final String PAGINATED_QUERY_PARAM_NEXT_TOKEN_KEY = "next_token";
public static final String PAGINATED_QUERY_PARAM_SIZE_KEY = "size";
public static final String PAGINATED_QUERY_ASCENDING_SORT = "ascending";

private final String requestedTokenStr;
private final String sort;
private final int size;

public PaginatedQueryRequest(String requested_token, String sort, int size) {
this.requestedTokenStr = requested_token;
public PaginatedQueryRequest(String requestedToken, String sort, int size) {
this.requestedTokenStr = requestedToken;
this.sort = sort;
this.size = size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import org.opensearch.cluster.metadata.IndexMetadata;

import java.util.Base64;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand All @@ -26,7 +28,6 @@
*/
public interface PaginationStrategy<T> {

String DESCENDING_SORT_PARAM_VALUE = "descending";
String INCORRECT_TAINTED_NEXT_TOKEN_ERROR_MESSAGE =
"Parameter [next_token] has been tainted and is incorrect. Please provide a valid [next_token].";

Expand All @@ -44,19 +45,14 @@ public interface PaginationStrategy<T> {

/**
*
* Utility method to get list of indices sorted by their creation time.
* Utility method to get list of indices filtered as per {@param filterPredicate} and the sorted according to {@param comparator}.
*/
static List<IndexMetadata> getListOfIndicesSortedByCreateTime(final ClusterState clusterState, String sortOrder) {
return clusterState.metadata().indices().values().stream().sorted((metadata1, metadata2) -> {
if (metadata1.getCreationDate() == metadata2.getCreationDate()) {
return DESCENDING_SORT_PARAM_VALUE.equals(sortOrder)
? metadata2.getIndex().getName().compareTo(metadata1.getIndex().getName())
: metadata1.getIndex().getName().compareTo(metadata2.getIndex().getName());
}
return DESCENDING_SORT_PARAM_VALUE.equals(sortOrder)
? Long.compare(metadata2.getCreationDate(), metadata1.getCreationDate())
: Long.compare(metadata1.getCreationDate(), metadata2.getCreationDate());
}).collect(Collectors.toList());
static List<IndexMetadata> getSortedIndexMetadata(
final ClusterState clusterState,
Predicate<IndexMetadata> filterPredicate,
Comparator<IndexMetadata> comparator
) {
return clusterState.metadata().indices().values().stream().filter(filterPredicate).sorted(comparator).collect(Collectors.toList());
}

static String encryptStringToken(String tokenString) {
Expand Down
Loading

0 comments on commit 3bedc82

Please sign in to comment.