Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pagination, phase 1: Add unit tests for :opensearch module with coverage. #233

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.planner;

import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.data.model.ExprValue;
Expand All @@ -15,6 +16,7 @@
import org.opensearch.sql.planner.physical.ProjectOperator;

@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class PaginateOperator extends PhysicalPlan {
@Getter
private final PhysicalPlan input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,4 @@ public boolean pushDownHighlight(LogicalHighlight highlight) {
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitTableScanBuilder(this, context);
}

public void pushDownOffset(int i) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.storage;
package org.opensearch.sql.opensearch.request;

import static org.opensearch.sql.opensearch.request.OpenSearchScrollRequest.DEFAULT_SCROLL_TIMEOUT;

Expand All @@ -17,9 +17,9 @@
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;

@EqualsAndHashCode
public class ContinueScrollRequest implements OpenSearchRequest {
final String initialScrollId;

Expand All @@ -39,9 +39,7 @@ public ContinueScrollRequest(String scrollId, OpenSearchExprValueFactory exprVal
@Override
public OpenSearchResponse search(Function<SearchRequest, SearchResponse> searchAction,
Function<SearchScrollRequest, SearchResponse> scrollAction) {
SearchResponse openSearchResponse;

openSearchResponse = scrollAction.apply(new SearchScrollRequest(initialScrollId)
SearchResponse openSearchResponse = scrollAction.apply(new SearchScrollRequest(initialScrollId)
.scroll(DEFAULT_SCROLL_TIMEOUT));

// TODO if terminated_early - something went wrong, e.g. no scroll returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.storage;
package org.opensearch.sql.opensearch.request;

import static org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder.DEFAULT_QUERY_TIMEOUT;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.index.query.QueryBuilder;
Expand All @@ -22,8 +21,6 @@
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest;
import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser;

public class InitialPageRequestBuilder implements PagedRequestBuilder {
Expand All @@ -40,15 +37,16 @@ public class InitialPageRequestBuilder implements PagedRequestBuilder {
* @param settings other settings
* @param exprValueFactory value factory
*/
// TODO accept indexName as string (same way as `OpenSearchRequestBuilder` does)?
public InitialPageRequestBuilder(OpenSearchRequest.IndexName indexName,
int pageSize,
Settings settings,
Settings settings, // TODO: settings are not used - refactor?
OpenSearchExprValueFactory exprValueFactory) {
this.indexName = indexName;
this.sourceBuilder = new SearchSourceBuilder();
this.exprValueFactory = exprValueFactory;
this.querySize = pageSize;
sourceBuilder.from(0)
this.sourceBuilder = new SearchSourceBuilder()
.from(0)
.size(querySize)
.timeout(DEFAULT_QUERY_TIMEOUT);
}
Expand Down Expand Up @@ -85,9 +83,8 @@ public void pushDownHighlight(String field, Map<String, Literal> arguments) {
* Push down project expression to OpenSearch.
*/
public void pushDownProjects(Set<ReferenceExpression> projects) {
final Set<String> projectsSet =
projects.stream().map(ReferenceExpression::getAttr).collect(Collectors.toSet());
sourceBuilder.fetchSource(projectsSet.toArray(new String[0]), new String[0]);
sourceBuilder.fetchSource(projects.stream().map(ReferenceExpression::getAttr)
.distinct().toArray(String[]::new), new String[0]);
}

public void pushTypeMapping(Map<String, ExprType> typeMapping) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

package org.opensearch.sql.opensearch.request;

import static org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder.DEFAULT_QUERY_TIMEOUT;

import com.google.common.annotations.VisibleForTesting;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -15,7 +17,6 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
Expand All @@ -32,11 +33,6 @@
@ToString
public class OpenSearchQueryRequest implements OpenSearchRequest {

/**
* Default query timeout in minutes.
*/
public static final TimeValue DEFAULT_QUERY_TIMEOUT = TimeValue.timeValueMinutes(1L);

/**
* {@link OpenSearchRequest.IndexName}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
@EqualsAndHashCode
@Getter
@ToString
// TODO make an interface which defines all pushDown functions?
public class OpenSearchRequestBuilder {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.storage;

import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest;
package org.opensearch.sql.opensearch.request;

public interface PagedRequestBuilder {
OpenSearchRequest build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,22 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.storage;
package org.opensearch.sql.opensearch.request;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;

@RequiredArgsConstructor
public class SubsequentPageRequestBuilder implements PagedRequestBuilder {
final OpenSearchRequest.IndexName indexName;
final String scrollId;
final OpenSearchExprValueFactory exprValueFactory;

@Getter
private final OpenSearchRequest.IndexName indexName;
private final String scrollId;
private final OpenSearchExprValueFactory exprValueFactory;

@Override
public OpenSearchRequest build() {
return new ContinueScrollRequest(scrollId, exprValueFactory);
}

@Override
public OpenSearchRequest.IndexName getIndexName() {
return indexName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
import org.opensearch.sql.opensearch.planner.physical.ADOperator;
import org.opensearch.sql.opensearch.planner.physical.MLCommonsOperator;
import org.opensearch.sql.opensearch.planner.physical.MLOperator;
import org.opensearch.sql.opensearch.request.InitialPageRequestBuilder;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder;
import org.opensearch.sql.opensearch.request.system.OpenSearchDescribeIndexRequest;
import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexScan;
import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexScanBuilder;
import org.opensearch.sql.opensearch.storage.scan.OpenSearchPagedIndexScan;
import org.opensearch.sql.opensearch.storage.scan.OpenSearchPagedIndexScanBuilder;
import org.opensearch.sql.planner.DefaultImplementor;
import org.opensearch.sql.planner.logical.LogicalAD;
import org.opensearch.sql.planner.logical.LogicalML;
Expand Down Expand Up @@ -132,7 +136,7 @@ public TableScanBuilder createPagedScanBuilder(int pageSize) {
var requestBuilder = new InitialPageRequestBuilder(indexName, pageSize,
settings, new OpenSearchExprValueFactory(getFieldTypes()));
var indexScan = new OpenSearchPagedIndexScan(client, requestBuilder);
return new OpenSearchPagedScanBuilder(indexScan);
return new OpenSearchPagedIndexScanBuilder(indexScan);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.request.SubsequentPageRequestBuilder;
import org.opensearch.sql.opensearch.storage.scan.OpenSearchPagedIndexScan;
import org.opensearch.sql.opensearch.storage.system.OpenSearchSystemIndex;
import org.opensearch.sql.storage.StorageEngine;
import org.opensearch.sql.storage.Table;
Expand All @@ -39,6 +41,7 @@ public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name) {

@Override
public TableScanOperator getTableScan(String indexName, String scrollId) {
// TODO call `getTable` here?
var index = new OpenSearchIndex(client, settings, indexName);
var requestBuilder = new SubsequentPageRequestBuilder(
new OpenSearchRequest.IndexName(indexName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/


package org.opensearch.sql.opensearch.storage;
package org.opensearch.sql.opensearch.storage.scan;

import java.util.Collections;
import java.util.Iterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.sql.expression.aggregation.NamedAggregator;
import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer;
import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser;
import org.opensearch.sql.opensearch.storage.OpenSearchIndexScan;
import org.opensearch.sql.opensearch.storage.script.aggregation.AggregationQueryBuilder;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalSort;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.google.common.annotations.VisibleForTesting;
import lombok.EqualsAndHashCode;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.opensearch.storage.OpenSearchIndexScan;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalHighlight;
Expand Down Expand Up @@ -92,11 +91,6 @@ public boolean pushDownProject(LogicalProject project) {
return delegate.pushDownProject(project);
}

@Override
public boolean pushDownOffset(int i) {
return delegate.pushDownOffset(i);
}

@Override
public boolean pushDownHighlight(LogicalHighlight highlight) {
return delegate.pushDownHighlight(highlight);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer;
import org.opensearch.sql.opensearch.storage.OpenSearchIndexScan;
import org.opensearch.sql.opensearch.storage.script.filter.FilterQueryBuilder;
import org.opensearch.sql.opensearch.storage.script.sort.SortQueryBuilder;
import org.opensearch.sql.planner.logical.LogicalFilter;
Expand All @@ -38,10 +37,6 @@
*/
@VisibleForTesting
class OpenSearchIndexScanQueryBuilder extends TableScanBuilder {
@Override
public void pushDownOffset(int i) {
indexScan.getRequestBuilder().getSourceBuilder().from(i);
}

/** OpenSearch index scan to be optimized. */
@EqualsAndHashCode.Include
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.storage;
package org.opensearch.sql.opensearch.storage.scan;

import java.util.Collections;
import java.util.Iterator;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.commons.lang3.NotImplementedException;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.request.PagedRequestBuilder;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
import org.opensearch.sql.storage.TableScanOperator;

Expand All @@ -33,7 +35,7 @@ public OpenSearchPagedIndexScan(OpenSearchClient client,

@Override
public String explain() {
throw new RuntimeException("Implement OpenSearchPagedIndexScan.explain");
throw new NotImplementedException("Implement OpenSearchPagedIndexScan.explain");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,25 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.storage;
package org.opensearch.sql.opensearch.storage.scan;

import lombok.EqualsAndHashCode;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* Builder for a paged opensearch request.
* Override pushDown* methods from TableScaneBuilder as more features
* Builder for a paged OpenSearch request.
* Override pushDown* methods from TableScanBuilder as more features
* support pagination.
*/
public class OpenSearchPagedScanBuilder extends TableScanBuilder {
public class OpenSearchPagedIndexScanBuilder extends TableScanBuilder {
@EqualsAndHashCode.Include
OpenSearchPagedIndexScan indexScan;

public OpenSearchPagedScanBuilder(OpenSearchPagedIndexScan indexScan) {
public OpenSearchPagedIndexScanBuilder(OpenSearchPagedIndexScan indexScan) {
this.indexScan = indexScan;
}


@Override
public TableScanOperator build() {
return indexScan;
Expand Down
Loading