From 4948ac4437560ebc2dfa8d31b642d93ed16b31b4 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Fri, 31 Mar 2023 18:27:12 -0700 Subject: [PATCH] Make scroll timeout configurable. Signed-off-by: Yury-Fridlyand --- .../org/opensearch/sql/sql/PaginationIT.java | 31 +++++++++++++++++++ .../request/ContinuePageRequest.java | 18 +++++------ .../request/ContinuePageRequestBuilder.java | 18 +++++++++-- .../request/InitialPageRequestBuilder.java | 8 +++-- .../request/OpenSearchRequestBuilder.java | 17 +++++++--- .../request/OpenSearchScrollRequest.java | 21 ++++--------- .../opensearch/storage/OpenSearchIndex.java | 2 +- .../storage/OpenSearchStorageEngine.java | 2 +- .../client/OpenSearchNodeClientTest.java | 19 +++++++++--- .../client/OpenSearchRestClientTest.java | 27 ++++++++++++---- .../OpenSearchExecutionEngineTest.java | 5 +++ .../OpenSearchExecutionProtectorTest.java | 8 +++-- .../ContinuePageRequestBuilderTest.java | 13 ++++++-- .../request/ContinuePageRequestTest.java | 6 ++-- .../InitialPageRequestBuilderTest.java | 14 +++++++-- .../request/OpenSearchRequestBuilderTest.java | 4 ++- .../request/OpenSearchScrollRequestTest.java | 10 +++--- .../storage/OpenSearchIndexTest.java | 13 +++++++- .../storage/scan/OpenSearchIndexScanTest.java | 3 ++ .../scan/OpenSearchPagedIndexScanTest.java | 10 +++--- 20 files changed, 181 insertions(+), 68 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/PaginationIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/PaginationIT.java index b9e32cb1cd..a1d353cde8 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/PaginationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/PaginationIT.java @@ -10,7 +10,10 @@ import java.io.IOException; import org.json.JSONObject; +import org.junit.Ignore; import org.junit.Test; +import org.opensearch.client.ResponseException; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.legacy.SQLIntegTestCase; import org.opensearch.sql.util.TestUtils; @@ -45,4 +48,32 @@ public void testLargeDataSetV2() throws IOException { assertEquals(4, response.getInt("size")); TestUtils.verifyIsV2Cursor(response); } + + @Ignore("Scroll may not expire after timeout") + // Scroll keep alive parameter guarantees that scroll context would be kept for that time, + // but doesn't define how fast it will be expired after time out. + // With KA = 1s scroll may be kept up to 30 sec or more. We can't test exact expiration. + // I disable the test to prevent it waiting for a minute and delay all CI. + public void testCursorTimeout() throws IOException, InterruptedException { + updateClusterSettings( + new ClusterSetting(PERSISTENT, Settings.Key.SQL_CURSOR_KEEP_ALIVE.getKeyValue(), "1s")); + + var query = "SELECT * from " + TEST_INDEX_CALCS; + var response = new JSONObject(executeFetchQuery(query, 4, "jdbc")); + assertTrue(response.has("cursor")); + var cursor = response.getString("cursor"); + Thread.sleep(2222L); // > 1s + + ResponseException exception = + expectThrows(ResponseException.class, () -> executeCursorQuery(cursor)); + response = new JSONObject(TestUtils.getResponseBody(exception.getResponse())); + assertEquals(response.getJSONObject("error").getString("reason"), + "Error occurred in OpenSearch engine: all shards failed"); + assertTrue(response.getJSONObject("error").getString("details") + .contains("SearchContextMissingException[No search context found for id")); + assertEquals(response.getJSONObject("error").getString("type"), + "SearchPhaseExecutionException"); + + wipeAllClusterSettings(); + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/ContinuePageRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/ContinuePageRequest.java index 6c81b9aca2..1ad6207682 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/ContinuePageRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/ContinuePageRequest.java @@ -5,16 +5,16 @@ package org.opensearch.sql.opensearch.request; -import static org.opensearch.sql.opensearch.request.OpenSearchScrollRequest.DEFAULT_SCROLL_TIMEOUT; - import java.util.function.Consumer; import java.util.function.Function; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.ToString; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchScrollRequest; +import org.opensearch.common.unit.TimeValue; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.response.OpenSearchResponse; @@ -26,11 +26,12 @@ * First (initial) request is handled by {@link InitialPageRequestBuilder}. */ @EqualsAndHashCode +@RequiredArgsConstructor public class ContinuePageRequest implements OpenSearchRequest { - final String initialScrollId; - + private final String initialScrollId; + private final TimeValue scrollTimeout; // ScrollId that OpenSearch returns after search. - String responseScrollId; + private String responseScrollId; @EqualsAndHashCode.Exclude @ToString.Exclude @@ -40,16 +41,11 @@ public class ContinuePageRequest implements OpenSearchRequest { @EqualsAndHashCode.Exclude private boolean scrollFinished = false; - public ContinuePageRequest(String scrollId, OpenSearchExprValueFactory exprValueFactory) { - this.initialScrollId = scrollId; - this.exprValueFactory = exprValueFactory; - } - @Override public OpenSearchResponse search(Function searchAction, Function scrollAction) { SearchResponse openSearchResponse = scrollAction.apply(new SearchScrollRequest(initialScrollId) - .scroll(DEFAULT_SCROLL_TIMEOUT)); + .scroll(scrollTimeout)); // TODO if terminated_early - something went wrong, e.g. no scroll returned. var response = new OpenSearchResponse(openSearchResponse, exprValueFactory); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/ContinuePageRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/ContinuePageRequestBuilder.java index 78288c1242..a0c19c1d0a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/ContinuePageRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/ContinuePageRequestBuilder.java @@ -6,23 +6,35 @@ package org.opensearch.sql.opensearch.request; import lombok.Getter; -import lombok.RequiredArgsConstructor; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; /** * Builds a {@link ContinuePageRequest} to handle subsequent pagination/scroll/cursor requests. * Initial search requests is handled by {@link InitialPageRequestBuilder}. */ -@RequiredArgsConstructor public class ContinuePageRequestBuilder extends PagedRequestBuilder { @Getter private final OpenSearchRequest.IndexName indexName; private final String scrollId; + private final TimeValue scrollTimeout; private final OpenSearchExprValueFactory exprValueFactory; + /** Constructor. */ + public ContinuePageRequestBuilder(OpenSearchRequest.IndexName indexName, + String scrollId, + Settings settings, + OpenSearchExprValueFactory exprValueFactory) { + this.indexName = indexName; + this.scrollId = scrollId; + this.scrollTimeout = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE); + this.exprValueFactory = exprValueFactory; + } + @Override public OpenSearchRequest build() { - return new ContinuePageRequest(scrollId, exprValueFactory); + return new ContinuePageRequest(scrollId, scrollTimeout, exprValueFactory); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/InitialPageRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/InitialPageRequestBuilder.java index dee009ee97..8023a86006 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/InitialPageRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/InitialPageRequestBuilder.java @@ -10,8 +10,9 @@ import java.util.Map; import java.util.Set; import lombok.Getter; +import org.opensearch.common.unit.TimeValue; import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.opensearch.data.type.OpenSearchDataType; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; @@ -27,6 +28,7 @@ public class InitialPageRequestBuilder extends PagedRequestBuilder { private final OpenSearchRequest.IndexName indexName; private final SearchSourceBuilder sourceBuilder; private final OpenSearchExprValueFactory exprValueFactory; + private final TimeValue scrollTimeout; /** * Constructor. @@ -37,9 +39,11 @@ public class InitialPageRequestBuilder extends PagedRequestBuilder { // TODO accept indexName as string (same way as `OpenSearchRequestBuilder` does)? public InitialPageRequestBuilder(OpenSearchRequest.IndexName indexName, int pageSize, + Settings settings, OpenSearchExprValueFactory exprValueFactory) { this.indexName = indexName; this.exprValueFactory = exprValueFactory; + this.scrollTimeout = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE); this.sourceBuilder = new SearchSourceBuilder() .from(0) .size(pageSize) @@ -48,7 +52,7 @@ public InitialPageRequestBuilder(OpenSearchRequest.IndexName indexName, @Override public OpenSearchScrollRequest build() { - return new OpenSearchScrollRequest(indexName, sourceBuilder, exprValueFactory); + return new OpenSearchScrollRequest(indexName, scrollTimeout, sourceBuilder, exprValueFactory); } /** diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java index 531710d545..6d5a8cf005 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java @@ -76,6 +76,11 @@ public class OpenSearchRequestBuilder implements PushDownRequestBuilder { */ private int querySize; + /** + * Scroll context life time. + */ + private final TimeValue scrollTimeout; + public OpenSearchRequestBuilder(String indexName, Integer maxResultWindow, Settings settings, @@ -93,12 +98,13 @@ public OpenSearchRequestBuilder(OpenSearchRequest.IndexName indexName, OpenSearchExprValueFactory exprValueFactory) { this.indexName = indexName; this.maxResultWindow = maxResultWindow; - this.sourceBuilder = new SearchSourceBuilder(); this.exprValueFactory = exprValueFactory; + this.scrollTimeout = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE); this.querySize = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT); - sourceBuilder.from(0); - sourceBuilder.size(querySize); - sourceBuilder.timeout(DEFAULT_QUERY_TIMEOUT); + this.sourceBuilder = new SearchSourceBuilder() + .from(0) + .size(querySize) + .timeout(DEFAULT_QUERY_TIMEOUT); } /** @@ -112,7 +118,8 @@ public OpenSearchRequest build() { if (from + size > maxResultWindow) { sourceBuilder.size(maxResultWindow - from); - return new OpenSearchScrollRequest(indexName, sourceBuilder, exprValueFactory); + return new OpenSearchScrollRequest( + indexName, scrollTimeout, sourceBuilder, exprValueFactory); } else { return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java index 8dceee99ee..2e723c949c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java @@ -32,8 +32,8 @@ @ToString public class OpenSearchScrollRequest implements OpenSearchRequest { - /** Default scroll context timeout in minutes. */ - public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(100L); + /** Scroll context timeout. */ + private final TimeValue scrollTimeout; /** * {@link OpenSearchRequest.IndexName}. @@ -58,22 +58,13 @@ public class OpenSearchScrollRequest implements OpenSearchRequest { /** Search request source builder. */ private final SearchSourceBuilder sourceBuilder; - /** Constructor. */ - public OpenSearchScrollRequest(IndexName indexName, OpenSearchExprValueFactory exprValueFactory) { - this.indexName = indexName; - this.sourceBuilder = new SearchSourceBuilder(); - this.exprValueFactory = exprValueFactory; - } - - public OpenSearchScrollRequest(String indexName, OpenSearchExprValueFactory exprValueFactory) { - this(new IndexName(indexName), exprValueFactory); - } - /** Constructor. */ public OpenSearchScrollRequest(IndexName indexName, + TimeValue scrollTimeout, SearchSourceBuilder sourceBuilder, OpenSearchExprValueFactory exprValueFactory) { this.indexName = indexName; + this.scrollTimeout = scrollTimeout; this.sourceBuilder = sourceBuilder; this.exprValueFactory = exprValueFactory; } @@ -117,7 +108,7 @@ public void clean(Consumer cleanAction) { public SearchRequest searchRequest() { return new SearchRequest() .indices(indexName.getIndexNames()) - .scroll(DEFAULT_SCROLL_TIMEOUT) + .scroll(scrollTimeout) .source(sourceBuilder); } @@ -137,7 +128,7 @@ public boolean isScroll() { */ public SearchScrollRequest scrollRequest() { Objects.requireNonNull(scrollId, "Scroll id cannot be null"); - return new SearchScrollRequest().scroll(DEFAULT_SCROLL_TIMEOUT).scrollId(scrollId); + return new SearchScrollRequest().scroll(scrollTimeout).scrollId(scrollId); } /** diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index 288bb6006a..110d3d640f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -165,7 +165,7 @@ public TableScanBuilder createScanBuilder() { @Override public TableScanBuilder createPagedScanBuilder(int pageSize) { - var requestBuilder = new InitialPageRequestBuilder(indexName, pageSize, + var requestBuilder = new InitialPageRequestBuilder(indexName, pageSize, settings, new OpenSearchExprValueFactory(getFieldOpenSearchTypes())); var indexScan = new OpenSearchPagedIndexScan(client, requestBuilder); return new OpenSearchPagedIndexScanBuilder(indexScan); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java index a5f5f372ad..14535edb79 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java @@ -45,7 +45,7 @@ public TableScanOperator getTableScan(String indexName, String scrollId) { var index = new OpenSearchIndex(client, settings, indexName); var requestBuilder = new ContinuePageRequestBuilder( new OpenSearchRequest.IndexName(indexName), - scrollId, + scrollId, settings, new OpenSearchExprValueFactory(index.getFieldOpenSearchTypes())); return new OpenSearchPagedIndexScan(client, requestBuilder); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java index 7787229603..dc9d7a5b5e 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java @@ -61,6 +61,7 @@ import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.xcontent.DeprecationHandler; @@ -69,6 +70,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.data.model.ExprIntegerValue; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; @@ -76,6 +78,7 @@ import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.mapping.IndexMapping; +import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; @@ -322,7 +325,9 @@ void search() { when(scrollResponse.getHits()).thenReturn(SearchHits.empty()); // Verify response for first scroll request - OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); + OpenSearchScrollRequest request = new OpenSearchScrollRequest( + new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + new SearchSourceBuilder(), factory); OpenSearchResponse response1 = client.search(request); assertFalse(response1.isEmpty()); @@ -355,7 +360,9 @@ void cleanup() { when(requestBuilder.addScrollId(any())).thenReturn(requestBuilder); when(requestBuilder.get()).thenReturn(null); - OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); + OpenSearchScrollRequest request = new OpenSearchScrollRequest( + new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + new SearchSourceBuilder(), factory); request.setScrollId("scroll123"); // Enforce cleaning by setting a private field. FieldUtils.writeField(request, "needClean", true, true); @@ -370,7 +377,9 @@ void cleanup() { @Test void cleanup_without_scrollId() { - OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); + OpenSearchScrollRequest request = new OpenSearchScrollRequest( + new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + new SearchSourceBuilder(), factory); client.cleanup(request); verify(nodeClient, never()).prepareClearScroll(); } @@ -380,7 +389,9 @@ void cleanup_without_scrollId() { void cleanup_rethrows_exception() { when(nodeClient.prepareClearScroll()).thenThrow(new RuntimeException()); - OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); + OpenSearchScrollRequest request = new OpenSearchScrollRequest( + new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + new SearchSourceBuilder(), factory); request.setScrollId("scroll123"); // Enforce cleaning by setting a private field. FieldUtils.writeField(request, "needClean", true, true); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java index b8920e52a6..6abd17a6fb 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java @@ -55,12 +55,14 @@ import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.data.model.ExprIntegerValue; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; @@ -68,6 +70,7 @@ import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.mapping.IndexMapping; +import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; @@ -303,7 +306,9 @@ void search() throws IOException { when(scrollResponse.getHits()).thenReturn(SearchHits.empty()); // Verify response for first scroll request - OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); + OpenSearchScrollRequest request = new OpenSearchScrollRequest( + new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + new SearchSourceBuilder(), factory); OpenSearchResponse response1 = client.search(request); assertFalse(response1.isEmpty()); @@ -323,7 +328,9 @@ void search_with_IOException() throws IOException { when(restClient.search(any(), any())).thenThrow(new IOException()); assertThrows( IllegalStateException.class, - () -> client.search(new OpenSearchScrollRequest("test", factory))); + () -> client.search(new OpenSearchScrollRequest( + new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + new SearchSourceBuilder(), factory))); } @Test @@ -343,7 +350,9 @@ void scroll_with_IOException() throws IOException { when(restClient.scroll(any(), any())).thenThrow(new IOException()); // First request run successfully - OpenSearchScrollRequest scrollRequest = new OpenSearchScrollRequest("test", factory); + OpenSearchScrollRequest scrollRequest = new OpenSearchScrollRequest( + new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + new SearchSourceBuilder(), factory); client.search(scrollRequest); assertThrows( IllegalStateException.class, () -> client.search(scrollRequest)); @@ -362,7 +371,9 @@ void schedule() { @Test @SneakyThrows void cleanup() { - OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); + OpenSearchScrollRequest request = new OpenSearchScrollRequest( + new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + new SearchSourceBuilder(), factory); // Enforce cleaning by setting a private field. FieldUtils.writeField(request, "needClean", true, true); request.setScrollId("scroll123"); @@ -373,7 +384,9 @@ void cleanup() { @Test void cleanup_without_scrollId() throws IOException { - OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); + OpenSearchScrollRequest request = new OpenSearchScrollRequest( + new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + new SearchSourceBuilder(), factory); client.cleanup(request); verify(restClient, never()).clearScroll(any(), any()); } @@ -383,7 +396,9 @@ void cleanup_without_scrollId() throws IOException { void cleanup_with_IOException() { when(restClient.clearScroll(any(), any())).thenThrow(new IOException()); - OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); + OpenSearchScrollRequest request = new OpenSearchScrollRequest( + new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + new SearchSourceBuilder(), factory); // Enforce cleaning by setting a private field. FieldUtils.writeField(request, "needClean", true, true); request.setScrollId("scroll123"); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java index b6b0269625..d762fbe2fa 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.sql.common.setting.Settings.Key.QUERY_SIZE_LIMIT; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; import static org.opensearch.sql.data.model.ExprValueUtils.tupleValue; import static org.opensearch.sql.executor.ExecutionEngine.QueryResponse; @@ -35,6 +36,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.model.ExprValue; @@ -171,6 +173,9 @@ void explain_successfully() { new PaginatedPlanCache(null)); Settings settings = mock(Settings.class); when(settings.getSettingValue(QUERY_SIZE_LIMIT)).thenReturn(100); + when(settings.getSettingValue(SQL_CURSOR_KEEP_ALIVE)) + .thenReturn(TimeValue.timeValueMinutes(1)); + PhysicalPlan plan = new OpenSearchIndexScan(mock(OpenSearchClient.class), new OpenSearchRequestBuilder("test", 10000, settings, mock(OpenSearchExprValueFactory.class))); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java index d0e486fae9..cf684b9409 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java @@ -11,6 +11,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.sql.ast.tree.Sort.SortOption.DEFAULT_ASC; +import static org.opensearch.sql.common.setting.Settings.Key.QUERY_SIZE_LIMIT; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; import static org.opensearch.sql.data.type.ExprCoreType.STRING; @@ -36,6 +38,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.client.node.NodeClient; +import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.ast.expression.DataType; import org.opensearch.sql.ast.expression.Literal; import org.opensearch.sql.ast.tree.RareTopN.CommandType; @@ -88,8 +91,9 @@ public void setup() { @Test public void testProtectIndexScan() { - when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); - + when(settings.getSettingValue(QUERY_SIZE_LIMIT)).thenReturn(200); + when(settings.getSettingValue(SQL_CURSOR_KEEP_ALIVE)) + .thenReturn(TimeValue.timeValueMinutes(1)); String indexName = "test"; Integer maxResultWindow = 10000; NamedExpression include = named("age", ref("age", INTEGER)); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/ContinuePageRequestBuilderTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/ContinuePageRequestBuilderTest.java index d549ed9200..e449126d1c 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/ContinuePageRequestBuilderTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/ContinuePageRequestBuilderTest.java @@ -6,6 +6,7 @@ package org.opensearch.sql.opensearch.request; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayNameGeneration; @@ -14,6 +15,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; @DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) @@ -23,6 +26,9 @@ public class ContinuePageRequestBuilderTest { @Mock private OpenSearchExprValueFactory exprValueFactory; + @Mock + private Settings settings; + private final OpenSearchRequest.IndexName indexName = new OpenSearchRequest.IndexName("test"); private final String scrollId = "scroll"; @@ -30,13 +36,16 @@ public class ContinuePageRequestBuilderTest { @BeforeEach void setup() { - requestBuilder = new ContinuePageRequestBuilder(indexName, scrollId, exprValueFactory); + when(settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)) + .thenReturn(TimeValue.timeValueMinutes(1)); + requestBuilder = new ContinuePageRequestBuilder( + indexName, scrollId, settings, exprValueFactory); } @Test public void build() { assertEquals( - new ContinuePageRequest(scrollId, exprValueFactory), + new ContinuePageRequest(scrollId, TimeValue.timeValueMinutes(1), exprValueFactory), requestBuilder.build() ); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/ContinuePageRequestTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/ContinuePageRequestTest.java index 32a15f5e8c..e991fc5787 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/ContinuePageRequestTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/ContinuePageRequestTest.java @@ -33,6 +33,7 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchScrollRequest; +import org.opensearch.common.unit.TimeValue; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; @@ -66,7 +67,8 @@ public class ContinuePageRequestTest { private final String scroll = "scroll"; private final String nextScroll = "nextScroll"; - private final ContinuePageRequest request = new ContinuePageRequest(scroll, factory); + private final ContinuePageRequest request = new ContinuePageRequest( + scroll, TimeValue.timeValueMinutes(1), factory); @Test public void search_with_non_empty_response() { @@ -118,7 +120,7 @@ public void getters() { factory = mock(); assertAll( () -> assertThrows(Throwable.class, request::getSourceBuilder), - () -> assertSame(factory, new ContinuePageRequest("", factory).getExprValueFactory()) + () -> assertSame(factory, new ContinuePageRequest("", null, factory).getExprValueFactory()) ); } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/InitialPageRequestBuilderTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/InitialPageRequestBuilderTest.java index beebb6a0ac..9d4c0b8dbe 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/InitialPageRequestBuilderTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/InitialPageRequestBuilderTest.java @@ -10,6 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; import static org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder.DEFAULT_QUERY_TIMEOUT; @@ -22,7 +23,9 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.common.unit.TimeValue; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.ReferenceExpression; @@ -36,6 +39,9 @@ public class InitialPageRequestBuilderTest { @Mock private OpenSearchExprValueFactory exprValueFactory; + @Mock + private Settings settings; + private final int pageSize = 42; private final OpenSearchRequest.IndexName indexName = new OpenSearchRequest.IndexName("test"); @@ -44,14 +50,16 @@ public class InitialPageRequestBuilderTest { @BeforeEach void setup() { + when(settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)) + .thenReturn(TimeValue.timeValueMinutes(1)); requestBuilder = new InitialPageRequestBuilder( - indexName, pageSize, exprValueFactory); + indexName, pageSize, settings, exprValueFactory); } @Test public void build() { assertEquals( - new OpenSearchScrollRequest(indexName, + new OpenSearchScrollRequest(indexName, TimeValue.timeValueMinutes(1), new SearchSourceBuilder() .from(0) .size(pageSize) @@ -91,7 +99,7 @@ public void pushDownProject() { requestBuilder.pushDownProjects(references); assertEquals( - new OpenSearchScrollRequest(indexName, + new OpenSearchScrollRequest(indexName, TimeValue.timeValueMinutes(1), new SearchSourceBuilder() .from(0) .size(pageSize) diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java index 636142207e..49283e61b9 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java @@ -65,6 +65,8 @@ public class OpenSearchRequestBuilderTest { @BeforeEach void setup() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)) + .thenReturn(TimeValue.timeValueMinutes(1)); requestBuilder = new OpenSearchRequestBuilder( "test", MAX_RESULT_WINDOW, settings, exprValueFactory); @@ -95,7 +97,7 @@ void build_scroll_request_with_correct_size() { assertEquals( new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), + new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), new SearchSourceBuilder() .from(offset) .size(MAX_RESULT_WINDOW - offset) diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java index 6e45476306..3ad6ad226d 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java @@ -25,6 +25,7 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchScrollRequest; +import org.opensearch.common.unit.TimeValue; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; @@ -38,8 +39,9 @@ class OpenSearchScrollRequestTest { @Mock private OpenSearchExprValueFactory factory; - private final OpenSearchScrollRequest request = - new OpenSearchScrollRequest("test", factory); + private final OpenSearchScrollRequest request = new OpenSearchScrollRequest( + new OpenSearchRequest.IndexName("test"), TimeValue.timeValueMinutes(1), + new SearchSourceBuilder(), factory); @Test void searchRequest() { @@ -48,7 +50,7 @@ void searchRequest() { assertEquals( new SearchRequest() .indices("test") - .scroll(OpenSearchScrollRequest.DEFAULT_SCROLL_TIMEOUT) + .scroll(TimeValue.timeValueMinutes(1)) .source(new SearchSourceBuilder().query(QueryBuilders.termQuery("name", "John"))), request.searchRequest()); } @@ -69,7 +71,7 @@ void scrollRequest() { request.setScrollId("scroll123"); assertEquals( new SearchScrollRequest() - .scroll(OpenSearchScrollRequest.DEFAULT_SCROLL_TIMEOUT) + .scroll(TimeValue.timeValueMinutes(1)) .scrollId("scroll123"), request.scrollRequest()); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java index 6705c1ef02..7181bd5e56 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java @@ -14,6 +14,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; @@ -41,6 +42,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.ast.tree.Sort; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.model.ExprBooleanValue; @@ -192,6 +194,8 @@ void checkCacheUsedForFieldMappings() { @Test void implementRelationOperatorOnly() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)) + .thenReturn(TimeValue.timeValueMinutes(1)); when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); LogicalPlan plan = index.createScanBuilder(); @@ -205,17 +209,22 @@ void implementRelationOperatorOnly() { @Test void implementPagedRelationOperatorOnly() { when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); + when(settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)) + .thenReturn(TimeValue.timeValueMinutes(1)); LogicalPlan plan = index.createPagedScanBuilder(42); Integer maxResultWindow = index.getMaxResultWindow(); PagedRequestBuilder builder = new InitialPageRequestBuilder( - new OpenSearchRequest.IndexName(indexName), maxResultWindow, exprValueFactory); + new OpenSearchRequest.IndexName(indexName), + maxResultWindow, mock(), exprValueFactory); assertEquals(new OpenSearchPagedIndexScan(client, builder), index.implement(plan)); } @Test void implementRelationOperatorWithOptimization() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)) + .thenReturn(TimeValue.timeValueMinutes(1)); when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); LogicalPlan plan = index.createScanBuilder(); @@ -231,6 +240,8 @@ void implementRelationOperatorWithOptimization() { @Test void implementOtherLogicalOperators() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)) + .thenReturn(TimeValue.timeValueMinutes(1)); when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); NamedExpression include = named("age", ref("age", INTEGER)); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java index 8cc0d46884..c133897ca2 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java @@ -32,6 +32,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.unit.TimeValue; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHit; @@ -67,6 +68,8 @@ class OpenSearchIndexScanTest { @BeforeEach void setup() { when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + when(settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)) + .thenReturn(TimeValue.timeValueMinutes(1)); } @Test diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScanTest.java index 65c0ddffc2..38888115c9 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScanTest.java @@ -53,7 +53,7 @@ public class OpenSearchPagedIndexScanTest { void query_empty_result() { mockResponse(client); InitialPageRequestBuilder builder = new InitialPageRequestBuilder( - new OpenSearchRequest.IndexName("test"), 3, exprValueFactory); + new OpenSearchRequest.IndexName("test"), 3, mock(), exprValueFactory); try (OpenSearchPagedIndexScan indexScan = new OpenSearchPagedIndexScan(client, builder)) { indexScan.open(); assertFalse(indexScan.hasNext()); @@ -69,7 +69,7 @@ void query_all_results_initial_scroll_request() { employee(3, "Allen", "IT")}); PagedRequestBuilder builder = new InitialPageRequestBuilder( - new OpenSearchRequest.IndexName("test"), 3, exprValueFactory); + new OpenSearchRequest.IndexName("test"), 3, mock(), exprValueFactory); try (OpenSearchPagedIndexScan indexScan = new OpenSearchPagedIndexScan(client, builder)) { indexScan.open(); @@ -90,7 +90,7 @@ void query_all_results_initial_scroll_request() { verify(client).cleanup(any()); builder = new ContinuePageRequestBuilder( - new OpenSearchRequest.IndexName("test"), "scroll", exprValueFactory); + new OpenSearchRequest.IndexName("test"), "scroll", mock(), exprValueFactory); try (OpenSearchPagedIndexScan indexScan = new OpenSearchPagedIndexScan(client, builder)) { indexScan.open(); @@ -107,7 +107,7 @@ void query_all_results_continuation_scroll_request() { employee(3, "Allen", "IT")}); ContinuePageRequestBuilder builder = new ContinuePageRequestBuilder( - new OpenSearchRequest.IndexName("test"), "scroll", exprValueFactory); + new OpenSearchRequest.IndexName("test"), "scroll", mock(), exprValueFactory); try (OpenSearchPagedIndexScan indexScan = new OpenSearchPagedIndexScan(client, builder)) { indexScan.open(); @@ -128,7 +128,7 @@ void query_all_results_continuation_scroll_request() { verify(client).cleanup(any()); builder = new ContinuePageRequestBuilder( - new OpenSearchRequest.IndexName("test"), "scroll", exprValueFactory); + new OpenSearchRequest.IndexName("test"), "scroll", mock(), exprValueFactory); try (OpenSearchPagedIndexScan indexScan = new OpenSearchPagedIndexScan(client, builder)) { indexScan.open();