Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify OpenSearchIndexScanBuilder #275

Merged
merged 1 commit into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.common.setting.Settings;
Expand All @@ -33,7 +34,6 @@
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;

/** OpenSearch table (index) implementation. */
Expand Down Expand Up @@ -171,19 +171,14 @@ public PhysicalPlan implement(LogicalPlan plan) {
public TableScanBuilder createScanBuilder() {
final int querySizeLimit = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT);

final TimeValue cursorKeepAlive = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
var builder = new OpenSearchRequestBuilder(
querySizeLimit,
createExprValueFactory());

return new OpenSearchIndexScanBuilder(builder) {
@Override
protected TableScanOperator createScan(OpenSearchRequestBuilder requestBuilder) {
final TimeValue cursorKeepAlive =
settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
return new OpenSearchIndexScan(client, requestBuilder.getMaxResponseSize(),
requestBuilder.build(indexName, getMaxResultWindow(), cursorKeepAlive));
}
};
Function<OpenSearchRequestBuilder, OpenSearchIndexScan> createScanOperator =
requestBuilder -> new OpenSearchIndexScan(client, requestBuilder.getMaxResponseSize(),
requestBuilder.build(indexName, getMaxResultWindow(), cursorKeepAlive));
return new OpenSearchIndexScanBuilder(builder, createScanOperator);
}

private OpenSearchExprValueFactory createExprValueFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.opensearch.storage.scan;

import java.util.function.Function;
import lombok.EqualsAndHashCode;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder;
Expand All @@ -24,8 +25,9 @@
* by delegated builder internally. This is to avoid conditional check of different push down logic
* for non-aggregate and aggregate query everywhere.
*/
public abstract class OpenSearchIndexScanBuilder extends TableScanBuilder {
public class OpenSearchIndexScanBuilder extends TableScanBuilder {

private final Function<OpenSearchRequestBuilder, OpenSearchIndexScan> scanFactory;
/**
* Delegated index scan builder for non-aggregate or aggregate query.
*/
Expand All @@ -38,25 +40,27 @@ public abstract class OpenSearchIndexScanBuilder extends TableScanBuilder {
/**
* Constructor used during query execution.
*/
protected OpenSearchIndexScanBuilder(OpenSearchRequestBuilder requestBuilder) {
public OpenSearchIndexScanBuilder(OpenSearchRequestBuilder requestBuilder,
Function<OpenSearchRequestBuilder, OpenSearchIndexScan> scanFactory) {
this.delegate = new OpenSearchIndexScanQueryBuilder(requestBuilder);
this.scanFactory = scanFactory;

}

/**
* Constructor used for unit tests.
*/
protected OpenSearchIndexScanBuilder(PushDownQueryBuilder translator) {
protected OpenSearchIndexScanBuilder(PushDownQueryBuilder translator,
Function<OpenSearchRequestBuilder, OpenSearchIndexScan> scanFactory) {
this.delegate = translator;
this.scanFactory = scanFactory;
}

@Override
public TableScanOperator build() {
return createScan(delegate.build());
return scanFactory.apply(delegate.build());
}

protected abstract TableScanOperator createScan(OpenSearchRequestBuilder requestBuilder);

@Override
public boolean pushDownFilter(LogicalFilter filter) {
return delegate.pushDownFilter(filter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package org.opensearch.sql.opensearch.storage.scan;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -86,7 +85,6 @@
import org.opensearch.sql.planner.optimizer.PushDownPageSize;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.TableScanOperator;

@ExtendWith(MockitoExtension.class)
class OpenSearchIndexScanOptimizationTest {
Expand All @@ -106,12 +104,7 @@ class OpenSearchIndexScanOptimizationTest {

@BeforeEach
void setUp() {
indexScanBuilder = new OpenSearchIndexScanBuilder(requestBuilder) {
@Override
protected TableScanOperator createScan(OpenSearchRequestBuilder build) {
return indexScan;
}
};
indexScanBuilder = new OpenSearchIndexScanBuilder(requestBuilder, requestBuilder -> indexScan);
when(table.createScanBuilder()).thenReturn(indexScanBuilder);
}

Expand Down Expand Up @@ -698,23 +691,15 @@ void project_literal_should_not_be_pushed_down() {

private OpenSearchIndexScanBuilder indexScanBuilder(Runnable... verifyPushDownCalls) {
this.verifyPushDownCalls = verifyPushDownCalls;
return new OpenSearchIndexScanBuilder(new OpenSearchIndexScanQueryBuilder(requestBuilder)) {
@Override
protected TableScanOperator createScan(OpenSearchRequestBuilder build) {
return indexScan;
}
};
return new OpenSearchIndexScanBuilder(new OpenSearchIndexScanQueryBuilder(requestBuilder),
requestBuilder -> indexScan);
}

private OpenSearchIndexScanBuilder indexScanAggBuilder(Runnable... verifyPushDownCalls) {
this.verifyPushDownCalls = verifyPushDownCalls;
return new OpenSearchIndexScanBuilder(new OpenSearchIndexScanAggregationBuilder(
requestBuilder, mock(LogicalAggregation.class))) {
@Override
protected TableScanOperator createScan(OpenSearchRequestBuilder build) {
return indexScan;
}
};
var aggregationBuilder = new OpenSearchIndexScanAggregationBuilder(
requestBuilder, mock(LogicalAggregation.class));
return new OpenSearchIndexScanBuilder(aggregationBuilder, builder -> indexScan);
}

private void assertEqualsAfterOptimization(LogicalPlan expected, LogicalPlan actual) {
Expand Down