Skip to content

Commit

Permalink
Make scroll timeout configurable.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
  • Loading branch information
Yury-Fridlyand committed Apr 1, 2023
1 parent e951192 commit 4948ac4
Show file tree
Hide file tree
Showing 20 changed files with 181 additions and 68 deletions.
31 changes: 31 additions & 0 deletions integ-test/src/test/java/org/opensearch/sql/sql/PaginationIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

import java.io.IOException;
import org.json.JSONObject;
import org.junit.Ignore;
import org.junit.Test;
import org.opensearch.client.ResponseException;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.legacy.SQLIntegTestCase;
import org.opensearch.sql.util.TestUtils;

Expand Down Expand Up @@ -45,4 +48,32 @@ public void testLargeDataSetV2() throws IOException {
assertEquals(4, response.getInt("size"));
TestUtils.verifyIsV2Cursor(response);
}

@Ignore("Scroll may not expire after timeout")
// Scroll keep alive parameter guarantees that scroll context would be kept for that time,
// but doesn't define how fast it will be expired after time out.
// With KA = 1s scroll may be kept up to 30 sec or more. We can't test exact expiration.
// I disable the test to prevent it waiting for a minute and delay all CI.
public void testCursorTimeout() throws IOException, InterruptedException {
updateClusterSettings(
new ClusterSetting(PERSISTENT, Settings.Key.SQL_CURSOR_KEEP_ALIVE.getKeyValue(), "1s"));

var query = "SELECT * from " + TEST_INDEX_CALCS;
var response = new JSONObject(executeFetchQuery(query, 4, "jdbc"));
assertTrue(response.has("cursor"));
var cursor = response.getString("cursor");
Thread.sleep(2222L); // > 1s

ResponseException exception =
expectThrows(ResponseException.class, () -> executeCursorQuery(cursor));
response = new JSONObject(TestUtils.getResponseBody(exception.getResponse()));
assertEquals(response.getJSONObject("error").getString("reason"),
"Error occurred in OpenSearch engine: all shards failed");
assertTrue(response.getJSONObject("error").getString("details")
.contains("SearchContextMissingException[No search context found for id"));
assertEquals(response.getJSONObject("error").getString("type"),
"SearchPhaseExecutionException");

wipeAllClusterSettings();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@

package org.opensearch.sql.opensearch.request;

import static org.opensearch.sql.opensearch.request.OpenSearchScrollRequest.DEFAULT_SCROLL_TIMEOUT;

import java.util.function.Consumer;
import java.util.function.Function;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
Expand All @@ -26,11 +26,12 @@
* First (initial) request is handled by {@link InitialPageRequestBuilder}.
*/
@EqualsAndHashCode
@RequiredArgsConstructor
public class ContinuePageRequest implements OpenSearchRequest {
final String initialScrollId;

private final String initialScrollId;
private final TimeValue scrollTimeout;
// ScrollId that OpenSearch returns after search.
String responseScrollId;
private String responseScrollId;

@EqualsAndHashCode.Exclude
@ToString.Exclude
Expand All @@ -40,16 +41,11 @@ public class ContinuePageRequest implements OpenSearchRequest {
@EqualsAndHashCode.Exclude
private boolean scrollFinished = false;

public ContinuePageRequest(String scrollId, OpenSearchExprValueFactory exprValueFactory) {
this.initialScrollId = scrollId;
this.exprValueFactory = exprValueFactory;
}

@Override
public OpenSearchResponse search(Function<SearchRequest, SearchResponse> searchAction,
Function<SearchScrollRequest, SearchResponse> scrollAction) {
SearchResponse openSearchResponse = scrollAction.apply(new SearchScrollRequest(initialScrollId)
.scroll(DEFAULT_SCROLL_TIMEOUT));
.scroll(scrollTimeout));

// TODO if terminated_early - something went wrong, e.g. no scroll returned.
var response = new OpenSearchResponse(openSearchResponse, exprValueFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,35 @@
package org.opensearch.sql.opensearch.request;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;

/**
* Builds a {@link ContinuePageRequest} to handle subsequent pagination/scroll/cursor requests.
* Initial search requests is handled by {@link InitialPageRequestBuilder}.
*/
@RequiredArgsConstructor
public class ContinuePageRequestBuilder extends PagedRequestBuilder {

@Getter
private final OpenSearchRequest.IndexName indexName;
private final String scrollId;
private final TimeValue scrollTimeout;
private final OpenSearchExprValueFactory exprValueFactory;

/** Constructor. */
public ContinuePageRequestBuilder(OpenSearchRequest.IndexName indexName,
String scrollId,
Settings settings,
OpenSearchExprValueFactory exprValueFactory) {
this.indexName = indexName;
this.scrollId = scrollId;
this.scrollTimeout = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
this.exprValueFactory = exprValueFactory;
}

@Override
public OpenSearchRequest build() {
return new ContinuePageRequest(scrollId, exprValueFactory);
return new ContinuePageRequest(scrollId, scrollTimeout, exprValueFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
import java.util.Map;
import java.util.Set;
import lombok.Getter;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
Expand All @@ -27,6 +28,7 @@ public class InitialPageRequestBuilder extends PagedRequestBuilder {
private final OpenSearchRequest.IndexName indexName;
private final SearchSourceBuilder sourceBuilder;
private final OpenSearchExprValueFactory exprValueFactory;
private final TimeValue scrollTimeout;

/**
* Constructor.
Expand All @@ -37,9 +39,11 @@ public class InitialPageRequestBuilder extends PagedRequestBuilder {
// TODO accept indexName as string (same way as `OpenSearchRequestBuilder` does)?
public InitialPageRequestBuilder(OpenSearchRequest.IndexName indexName,
int pageSize,
Settings settings,
OpenSearchExprValueFactory exprValueFactory) {
this.indexName = indexName;
this.exprValueFactory = exprValueFactory;
this.scrollTimeout = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
this.sourceBuilder = new SearchSourceBuilder()
.from(0)
.size(pageSize)
Expand All @@ -48,7 +52,7 @@ public InitialPageRequestBuilder(OpenSearchRequest.IndexName indexName,

@Override
public OpenSearchScrollRequest build() {
return new OpenSearchScrollRequest(indexName, sourceBuilder, exprValueFactory);
return new OpenSearchScrollRequest(indexName, scrollTimeout, sourceBuilder, exprValueFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public class OpenSearchRequestBuilder implements PushDownRequestBuilder {
*/
private int querySize;

/**
* Scroll context life time.
*/
private final TimeValue scrollTimeout;

public OpenSearchRequestBuilder(String indexName,
Integer maxResultWindow,
Settings settings,
Expand All @@ -93,12 +98,13 @@ public OpenSearchRequestBuilder(OpenSearchRequest.IndexName indexName,
OpenSearchExprValueFactory exprValueFactory) {
this.indexName = indexName;
this.maxResultWindow = maxResultWindow;
this.sourceBuilder = new SearchSourceBuilder();
this.exprValueFactory = exprValueFactory;
this.scrollTimeout = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
this.querySize = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT);
sourceBuilder.from(0);
sourceBuilder.size(querySize);
sourceBuilder.timeout(DEFAULT_QUERY_TIMEOUT);
this.sourceBuilder = new SearchSourceBuilder()
.from(0)
.size(querySize)
.timeout(DEFAULT_QUERY_TIMEOUT);
}

/**
Expand All @@ -112,7 +118,8 @@ public OpenSearchRequest build() {

if (from + size > maxResultWindow) {
sourceBuilder.size(maxResultWindow - from);
return new OpenSearchScrollRequest(indexName, sourceBuilder, exprValueFactory);
return new OpenSearchScrollRequest(
indexName, scrollTimeout, sourceBuilder, exprValueFactory);
} else {
return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
@ToString
public class OpenSearchScrollRequest implements OpenSearchRequest {

/** Default scroll context timeout in minutes. */
public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(100L);
/** Scroll context timeout. */
private final TimeValue scrollTimeout;

/**
* {@link OpenSearchRequest.IndexName}.
Expand All @@ -58,22 +58,13 @@ public class OpenSearchScrollRequest implements OpenSearchRequest {
/** Search request source builder. */
private final SearchSourceBuilder sourceBuilder;

/** Constructor. */
public OpenSearchScrollRequest(IndexName indexName, OpenSearchExprValueFactory exprValueFactory) {
this.indexName = indexName;
this.sourceBuilder = new SearchSourceBuilder();
this.exprValueFactory = exprValueFactory;
}

public OpenSearchScrollRequest(String indexName, OpenSearchExprValueFactory exprValueFactory) {
this(new IndexName(indexName), exprValueFactory);
}

/** Constructor. */
public OpenSearchScrollRequest(IndexName indexName,
TimeValue scrollTimeout,
SearchSourceBuilder sourceBuilder,
OpenSearchExprValueFactory exprValueFactory) {
this.indexName = indexName;
this.scrollTimeout = scrollTimeout;
this.sourceBuilder = sourceBuilder;
this.exprValueFactory = exprValueFactory;
}
Expand Down Expand Up @@ -117,7 +108,7 @@ public void clean(Consumer<String> cleanAction) {
public SearchRequest searchRequest() {
return new SearchRequest()
.indices(indexName.getIndexNames())
.scroll(DEFAULT_SCROLL_TIMEOUT)
.scroll(scrollTimeout)
.source(sourceBuilder);
}

Expand All @@ -137,7 +128,7 @@ public boolean isScroll() {
*/
public SearchScrollRequest scrollRequest() {
Objects.requireNonNull(scrollId, "Scroll id cannot be null");
return new SearchScrollRequest().scroll(DEFAULT_SCROLL_TIMEOUT).scrollId(scrollId);
return new SearchScrollRequest().scroll(scrollTimeout).scrollId(scrollId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public TableScanBuilder createScanBuilder() {

@Override
public TableScanBuilder createPagedScanBuilder(int pageSize) {
var requestBuilder = new InitialPageRequestBuilder(indexName, pageSize,
var requestBuilder = new InitialPageRequestBuilder(indexName, pageSize, settings,
new OpenSearchExprValueFactory(getFieldOpenSearchTypes()));
var indexScan = new OpenSearchPagedIndexScan(client, requestBuilder);
return new OpenSearchPagedIndexScanBuilder(indexScan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public TableScanOperator getTableScan(String indexName, String scrollId) {
var index = new OpenSearchIndex(client, settings, indexName);
var requestBuilder = new ContinuePageRequestBuilder(
new OpenSearchRequest.IndexName(indexName),
scrollId,
scrollId, settings,
new OpenSearchExprValueFactory(index.getFieldOpenSearchTypes()));
return new OpenSearchPagedIndexScan(client, requestBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.DeprecationHandler;
Expand All @@ -69,13 +70,15 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.data.model.ExprIntegerValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
import org.opensearch.sql.opensearch.data.type.OpenSearchTextType;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;

Expand Down Expand Up @@ -322,7 +325,9 @@ void search() {
when(scrollResponse.getHits()).thenReturn(SearchHits.empty());

// Verify response for first scroll request
OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory);
OpenSearchScrollRequest request = new OpenSearchScrollRequest(
new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1),
new SearchSourceBuilder(), factory);
OpenSearchResponse response1 = client.search(request);
assertFalse(response1.isEmpty());

Expand Down Expand Up @@ -355,7 +360,9 @@ void cleanup() {
when(requestBuilder.addScrollId(any())).thenReturn(requestBuilder);
when(requestBuilder.get()).thenReturn(null);

OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory);
OpenSearchScrollRequest request = new OpenSearchScrollRequest(
new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1),
new SearchSourceBuilder(), factory);
request.setScrollId("scroll123");
// Enforce cleaning by setting a private field.
FieldUtils.writeField(request, "needClean", true, true);
Expand All @@ -370,7 +377,9 @@ void cleanup() {

@Test
void cleanup_without_scrollId() {
OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory);
OpenSearchScrollRequest request = new OpenSearchScrollRequest(
new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1),
new SearchSourceBuilder(), factory);
client.cleanup(request);
verify(nodeClient, never()).prepareClearScroll();
}
Expand All @@ -380,7 +389,9 @@ void cleanup_without_scrollId() {
void cleanup_rethrows_exception() {
when(nodeClient.prepareClearScroll()).thenThrow(new RuntimeException());

OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory);
OpenSearchScrollRequest request = new OpenSearchScrollRequest(
new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1),
new SearchSourceBuilder(), factory);
request.setScrollId("scroll123");
// Enforce cleaning by setting a private field.
FieldUtils.writeField(request, "needClean", true, true);
Expand Down
Loading

0 comments on commit 4948ac4

Please sign in to comment.