Skip to content

Commit

Permalink
Addressing review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed May 2, 2022
1 parent de5c4e4 commit 3359986
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@

package org.opensearch.search.slice;

import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;

import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.CreatePITAction;
import org.opensearch.action.search.CreatePITRequest;
import org.opensearch.action.search.CreatePITResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
Expand All @@ -46,6 +50,7 @@
import org.opensearch.search.Scroll;
import org.opensearch.search.SearchException;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.SortBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -129,6 +134,77 @@ public void testSearchSort() throws Exception {
}
}

public void testSearchSortWithoutPitOrScroll() throws Exception {
int numShards = randomIntBetween(1, 7);
int numDocs = randomIntBetween(100, 1000);
setupIndex(numDocs, numShards);
int fetchSize = randomIntBetween(10, 100);
SearchRequestBuilder request = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setSize(fetchSize)
.addSort(SortBuilders.fieldSort("_doc"));
SliceBuilder sliceBuilder = new SliceBuilder("_id", 0, 4);
SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> request.slice(sliceBuilder).get());
assertTrue(ex.getMessage().contains("all shards failed"));
}

public void testSearchSortWithPIT() throws Exception {
int numShards = randomIntBetween(1, 7);
int numDocs = randomIntBetween(100, 1000);
setupIndex(numDocs, numShards);
int max = randomIntBetween(2, numShards * 3);
CreatePITRequest pitRequest = new CreatePITRequest(TimeValue.timeValueDays(1), true);
pitRequest.setIndices(new String[] { "test" });
ActionFuture<CreatePITResponse> execute = client().execute(CreatePITAction.INSTANCE, pitRequest);
CreatePITResponse pitResponse = execute.get();
for (String field : new String[] { "_id", "random_int", "static_int" }) {
int fetchSize = randomIntBetween(10, 100);

// test _doc sort
SearchRequestBuilder request = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()))
.setSize(fetchSize)
.addSort(SortBuilders.fieldSort("_doc"));
assertSearchSlicesWithPIT(request, field, max, numDocs);

// test numeric sort
request = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()))
.setSize(fetchSize)
.addSort(SortBuilders.fieldSort("random_int"));
assertSearchSlicesWithPIT(request, field, max, numDocs);
}
}

private void assertSearchSlicesWithPIT(SearchRequestBuilder request, String field, int numSlice, int numDocs) {
int totalResults = 0;
List<String> keys = new ArrayList<>();
for (int id = 0; id < numSlice; id++) {
SliceBuilder sliceBuilder = new SliceBuilder(field, id, numSlice);
SearchResponse searchResponse = request.slice(sliceBuilder).setFrom(0).get();
totalResults += searchResponse.getHits().getHits().length;
int expectedSliceResults = (int) searchResponse.getHits().getTotalHits().value;
int numSliceResults = searchResponse.getHits().getHits().length;
for (SearchHit hit : searchResponse.getHits().getHits()) {
assertTrue(keys.add(hit.getId()));
}
while (searchResponse.getHits().getHits().length > 0) {
searchResponse = request.setFrom(numSliceResults).slice(sliceBuilder).get();
totalResults += searchResponse.getHits().getHits().length;
numSliceResults += searchResponse.getHits().getHits().length;
for (SearchHit hit : searchResponse.getHits().getHits()) {
assertTrue(keys.add(hit.getId()));
}
}
assertThat(numSliceResults, equalTo(expectedSliceResults));
}
assertThat(totalResults, equalTo(numDocs));
assertThat(keys.size(), equalTo(numDocs));
assertThat(new HashSet(keys).size(), equalTo(numDocs));
}

public void testWithPreferenceAndRoutings() throws Exception {
int numShards = 10;
int totalDocs = randomIntBetween(100, 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.transport.Transport;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
Expand All @@ -42,7 +43,7 @@
* Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request and
* fail user request if any of the updates in this phase are failed - we clean up PITs in case of such failures
*/
public class PITController implements Runnable {
public class CreatePITController implements Runnable {
private final Runnable runner;
private final SearchTransportService searchTransportService;
private final ClusterService clusterService;
Expand All @@ -51,14 +52,14 @@ public class PITController implements Runnable {
private final Task task;
private final ActionListener<CreatePITResponse> listener;
private final CreatePITRequest request;
private static final Logger logger = LogManager.getLogger(PITController.class);
private static final Logger logger = LogManager.getLogger(CreatePITController.class);
public static final Setting<TimeValue> CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING = Setting.positiveTimeSetting(
"pit.temporary.keep_alive_interval",
timeValueSeconds(30),
Setting.Property.NodeScope
);

public PITController(
public CreatePITController(
CreatePITRequest request,
SearchTransportService searchTransportService,
ClusterService clusterService,
Expand All @@ -81,6 +82,12 @@ private TimeValue getCreatePitTemporaryKeepAlive() {
return CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.get(clusterService.getSettings());
}

/**
* Method for creating PIT reader context
* Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a temporary keep alive
* Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request and
* fail user request if any of the updates in this phase are failed - we clean up PITs in case of such failures
*/
public void executeCreatePit() {
SearchRequest searchRequest = new SearchRequest(request.getIndices());
searchRequest.preference(request.getPreference());
Expand All @@ -94,7 +101,7 @@ public void executeCreatePit() {
task.getAction(),
() -> task.getDescription(),
task.getParentTaskId(),
task.getHeaders()
new HashMap<>()
);

final StepListener<SearchResponse> createPitListener = new StepListener<>();
Expand All @@ -118,6 +125,7 @@ public void executeCreatePit() {
* Creates PIT reader context with temporary keep alive
*/
void executeCreatePit(Task task, SearchRequest searchRequest, StepListener<SearchResponse> createPitListener) {
logger.debug("Creating PIT context");
transportSearchAction.executeRequest(
task,
searchRequest,
Expand Down Expand Up @@ -152,7 +160,13 @@ void executeUpdatePitId(
ActionListener<CreatePITResponse> updatePitIdListener
) {
createPitListener.whenComplete(searchResponse -> {
CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse);
logger.debug("Updating PIT context with PIT ID, creation time and keep alive");
/**
* store the create time ( same create time for all PIT contexts across shards ) to be used
* for list PIT api
*/
final long creationTime = System.currentTimeMillis();
CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse, creationTime);
SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, createPITResponse.getId());
final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = getConnectionLookupListener(contextId);
lookupListener.whenComplete(nodelookup -> {
Expand All @@ -162,11 +176,6 @@ void executeUpdatePitId(
contextId.shards().size(),
contextId.shards().values()
);
/**
* store the create time ( same create time for all PIT contexts across shards ) to be used
* for list PIT api
*/
long createTime = System.currentTimeMillis();
for (Map.Entry<ShardId, SearchContextIdForNode> entry : contextId.shards().entrySet()) {
DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode());
try {
Expand All @@ -180,7 +189,7 @@ void executeUpdatePitId(
entry.getValue().getSearchContextId(),
createPITResponse.getId(),
request.getKeepAlive().millis(),
createTime
creationTime
),
groupedActionListener
);
Expand All @@ -205,10 +214,10 @@ private StepListener<BiFunction<String, String, DiscoveryNode>> getConnectionLoo

final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = new StepListener<>();

if (clusters.isEmpty() == false) {
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
} else {
if (clusters.isEmpty()) {
lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId));
} else {
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
}
return lookupListener;
}
Expand Down Expand Up @@ -246,7 +255,7 @@ public void onResponse(Integer freed) {

@Override
public void onFailure(Exception e) {
logger.debug("Cleaning up PIT contexts failed ", e);
logger.error("Cleaning up PIT contexts failed ", e);
}
};
ClearScrollController.closeContexts(clusterService.state().getNodes(), searchTransportService, contexts, deleteListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public final String buildDescription() {
Strings.arrayToDelimitedString(indices, ",", sb);
sb.append("], ");
sb.append("pointintime[").append(keepAlive).append("], ");
sb.append("allowPartialPitCreation[").append(allowPartialPitCreation).append("], ");
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ public class CreatePITResponse extends ActionResponse implements StatusToXConten
private final int failedShards;
private final int skippedShards;
private final ShardSearchFailure[] shardFailures;
private final long creationTime;

public CreatePITResponse(SearchResponse searchResponse) {
public CreatePITResponse(SearchResponse searchResponse, long creationTime) {
if (searchResponse.pointInTimeId() == null || searchResponse.pointInTimeId().isEmpty()) {
throw new IllegalArgumentException("Point in time ID is empty");
}
this.id = searchResponse.pointInTimeId();
this.totalShards = searchResponse.getTotalShards();
this.successfulShards = searchResponse.getSuccessfulShards();
this.failedShards = searchResponse.getFailedShards();
this.skippedShards = searchResponse.getSkippedShards();
this.shardFailures = searchResponse.getShardFailures();
this.creationTime = creationTime;
}

public CreatePITResponse(StreamInput in) throws IOException {
Expand All @@ -46,6 +51,7 @@ public CreatePITResponse(StreamInput in) throws IOException {
successfulShards = in.readVInt();
failedShards = in.readVInt();
skippedShards = in.readVInt();
creationTime = in.readLong();
int size = in.readVInt();
if (size == 0) {
shardFailures = ShardSearchFailure.EMPTY_ARRAY;
Expand All @@ -70,10 +76,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
getFailedShards(),
getShardFailures()
);
builder.field("creationTime", creationTime);
builder.endObject();
return builder;
}

public long getCreationTime() {
return creationTime;
}

/**
* The failed number of shards the search was executed on.
*/
Expand All @@ -97,6 +108,7 @@ public void writeTo(StreamOutput out) throws IOException {
shardSearchFailure.writeTo(out);
}
out.writeString(id);
out.writeLong(creationTime);
}

public String getId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public TransportCreatePITAction(

@Override
protected void doExecute(Task task, CreatePITRequest request, ActionListener<CreatePITResponse> listener) {
Runnable runnable = new PITController(
Runnable runnable = new CreatePITController(
request,
searchTransportService,
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
package org.opensearch.common.settings;

import org.apache.logging.log4j.LogManager;
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.action.search.PITController;
import org.opensearch.action.search.CreatePITController;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
Expand Down Expand Up @@ -467,7 +466,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.ALLOW_EXPENSIVE_QUERIES,
SearchService.MAX_OPEN_PIT_CONTEXT,
SearchService.MAX_PIT_KEEPALIVE_SETTING,
PITController.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING,
CreatePITController.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING,
MultiBucketConsumerService.MAX_BUCKET_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
SearchService.MAX_OPEN_SCROLL_CONTEXT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public void preProcess(boolean rewrite) {
}
}

if (sliceBuilder != null) {
if (sliceBuilder != null && scrollContext() != null) {
int sliceLimit = indexService.getIndexSettings().getMaxSlicesPerScroll();
int numSlices = sliceBuilder.getMax();
if (numSlices > sliceLimit) {
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1297,8 +1297,8 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
}

if (source.slice() != null) {
if (context.scrollContext() == null) {
throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context");
if (context.scrollContext() == null && !(context.readerContext() instanceof PitReaderContext)) {
throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context or PIT context");
}
context.sliceBuilder(source.slice());
}
Expand Down
4 changes: 0 additions & 4 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,6 @@ public String getHeader(String header) {
return headers.get(header);
}

public Map<String, String> getHeaders() {
return headers;
}

public TaskResult result(DiscoveryNode node, Exception error) throws IOException {
return new TaskResult(taskInfo(node.getId(), true, true), error);
}
Expand Down
Loading

0 comments on commit 3359986

Please sign in to comment.