Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds plan context for Scroll physical plan building #713

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Objects;
import lombok.Getter;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.planner.PlanContext;

/**
* The context used for Analyzer.
Expand All @@ -20,16 +21,35 @@ public class AnalysisContext {
* Environment stack for symbol scope management.
*/
private TypeEnvironment environment;

/**
* Context for physical plan building.
*/
@Getter
private PlanContext planContext;

@Getter
private final List<NamedExpression> namedParseExpressions;

public AnalysisContext() {
this(new TypeEnvironment(null));
this(new TypeEnvironment(null), new PlanContext());
}

public AnalysisContext(PlanContext planContext) {
this (new TypeEnvironment(null), planContext);
}

public AnalysisContext(TypeEnvironment environment) {
this(environment, new PlanContext());
}

/**
* Constructor.
*/
public AnalysisContext(TypeEnvironment environment, PlanContext planContext) {
this.environment = environment;
this.namedParseExpressions = new ArrayList<>();
this.planContext = planContext;
}

/**
Expand Down
34 changes: 34 additions & 0 deletions core/src/main/java/org/opensearch/sql/planner/PlanContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner;

import lombok.Getter;
import lombok.Setter;

/**
* PlanContext that stores context for physical plan generation.
*/
public class PlanContext {
@Getter
@Setter
private IndexScanType indexScanType;

public PlanContext() {
this.indexScanType = IndexScanType.QUERY;
}

public enum IndexScanType {
/**
* default query request.
*/
QUERY,

/**
* scroll request.
*/
SCROLL
}
}
8 changes: 5 additions & 3 deletions core/src/main/java/org/opensearch/sql/planner/Planner.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,20 @@ public class Planner {
* translate logical plan to physical by default implementor.
* TODO: for now just delegate entire logical plan to storage engine.
*
* @param plan logical plan
* @param plan logical plan
* @param context plan context
* @return optimal physical plan
*/
public PhysicalPlan plan(LogicalPlan plan) {
public PhysicalPlan plan(LogicalPlan plan, PlanContext context) {
String tableName = findTableName(plan);
if (isNullOrEmpty(tableName)) {
return plan.accept(new DefaultImplementor<>(), null);
}

Table table = storageEngine.getTable(tableName);
return table.implement(
table.optimize(optimize(plan)));
table.optimize(optimize(plan)),
context);
}

private String findTableName(LogicalPlan plan) {
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/org/opensearch/sql/storage/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.util.Map;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;

Expand All @@ -24,10 +25,11 @@ public interface Table {
/**
* Implement a {@link LogicalPlan} by {@link PhysicalPlan} in storage engine.
*
* @param plan logical plan
* @param plan logical plan
* @param context plan context
* @return physical plan
*/
PhysicalPlan implement(LogicalPlan plan);
PhysicalPlan implement(LogicalPlan plan, PlanContext context);

/**
* Optimize the {@link LogicalPlan} by storage engine rule.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.env.Environment;
import org.opensearch.sql.expression.function.BuiltinFunctionRepository;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.StorageEngine;
Expand All @@ -47,7 +48,7 @@ public Map<String, ExprType> getFieldTypes() {
}

@Override
public PhysicalPlan implement(LogicalPlan plan) {
public PhysicalPlan implement(LogicalPlan plan, PlanContext context) {
throw new UnsupportedOperationException();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.env.Environment;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.StorageEngine;
Expand Down Expand Up @@ -69,7 +70,7 @@ public Map<String, ExprType> getFieldTypes() {
}

@Override
public PhysicalPlan implement(LogicalPlan plan) {
public PhysicalPlan implement(LogicalPlan plan, PlanContext context) {
throw new UnsupportedOperationException();
}
};
Expand Down
18 changes: 11 additions & 7 deletions core/src/test/java/org/opensearch/sql/planner/PlannerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public void planner_test() {
ImmutableList.of()
),
ImmutableMap.of(DSL.ref("ivalue", INTEGER), DSL.ref("avg(response)", DOUBLE))
)
),
new PlanContext()
);
}

Expand All @@ -105,16 +106,19 @@ public void plan_a_query_without_relation_involved() {
DSL.named("123", DSL.literal(123)),
DSL.named("hello", DSL.literal("hello")),
DSL.named("false", DSL.literal(false))
)
),
new PlanContext()
);
}

protected void assertPhysicalPlan(PhysicalPlan expected, LogicalPlan logicalPlan) {
assertEquals(expected, analyze(logicalPlan));
protected void assertPhysicalPlan(PhysicalPlan expected,
LogicalPlan logicalPlan,
PlanContext planContext) {
assertEquals(expected, analyze(logicalPlan, planContext));
}

protected PhysicalPlan analyze(LogicalPlan logicalPlan) {
return new Planner(storageEngine, optimizer).plan(logicalPlan);
protected PhysicalPlan analyze(LogicalPlan logicalPlan, PlanContext planContext) {
return new Planner(storageEngine, optimizer).plan(logicalPlan, planContext);
}

protected class MockTable extends LogicalPlanNodeVisitor<PhysicalPlan, Object> implements Table {
Expand All @@ -125,7 +129,7 @@ public Map<String, ExprType> getFieldTypes() {
}

@Override
public PhysicalPlan implement(LogicalPlan plan) {
public PhysicalPlan implement(LogicalPlan plan, PlanContext context) {
return plan.accept(this, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.opensearch.security.SecurityAccess;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.protocol.response.QueryResult;
import org.opensearch.sql.protocol.response.format.CsvResponseFormatter;
Expand Down Expand Up @@ -101,9 +102,12 @@ public RestChannelConsumer prepareRequest(SQLQueryRequest request, NodeClient no
try {
// For now analyzing and planning stage may throw syntax exception as well
// which hints the fallback to legacy code is necessary here.
PlanContext context = new PlanContext();
plan = sqlService.plan(
sqlService.analyze(
sqlService.parse(request.getQuery())));
sqlService.analyze(
sqlService.parse(request.getQuery()),
context
), context);
} catch (SyntaxCheckException e) {
// When explain, print info log for what unsupported syntax is causing fallback to old engine
if (request.isExplainRequest()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opensearch.sql.opensearch.storage.script.sort.SortQueryBuilder;
import org.opensearch.sql.opensearch.storage.serialization.DefaultExpressionSerializer;
import org.opensearch.sql.planner.DefaultImplementor;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.logical.LogicalAD;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
import org.opensearch.sql.planner.logical.LogicalPlan;
Expand Down Expand Up @@ -83,8 +84,8 @@ public Map<String, ExprType> getFieldTypes() {
* TODO: Push down operations to index scan operator as much as possible in future.
*/
@Override
public PhysicalPlan implement(LogicalPlan plan) {
OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, settings, indexName,
public PhysicalPlan implement(LogicalPlan plan, PlanContext context) {
OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, settings, indexName, context,
new OpenSearchExprValueFactory(getFieldTypes()));

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME;
import static org.opensearch.search.sort.SortOrder.ASC;

import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -34,8 +33,10 @@
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.request.OpenSearchQueryRequest;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.storage.TableScanOperator;

/**
Expand All @@ -62,37 +63,40 @@ public class OpenSearchIndexScan extends TableScanOperator {
*/
public OpenSearchIndexScan(OpenSearchClient client,
Settings settings, String indexName,
OpenSearchExprValueFactory exprValueFactory) {
this(client, settings, new OpenSearchRequest.IndexName(indexName), exprValueFactory);
PlanContext context, OpenSearchExprValueFactory exprValueFactory) {
this(client, settings, new OpenSearchRequest.IndexName(indexName), context, exprValueFactory);
}

/**
* Constructor.
*/
public OpenSearchIndexScan(OpenSearchClient client,
Settings settings, OpenSearchRequest.IndexName indexName,
OpenSearchExprValueFactory exprValueFactory) {
Settings settings, OpenSearchRequest.IndexName indexName,
PlanContext context, OpenSearchExprValueFactory exprValueFactory) {
this.client = client;
this.request = new OpenSearchQueryRequest(indexName,
settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT), exprValueFactory);
switch (context.getIndexScanType()) {
case SCROLL:
this.request = new OpenSearchScrollRequest(indexName, exprValueFactory);
break;
case QUERY:
default:
this.request = new OpenSearchQueryRequest(indexName,
settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT), exprValueFactory);
}
}

@Override
public void open() {
super.open();

// For now pull all results immediately once open
List<OpenSearchResponse> responses = new ArrayList<>();
OpenSearchResponse response = client.search(request);
while (!response.isEmpty()) {
responses.add(response);
response = client.search(request);
}
iterator = Iterables.concat(responses.toArray(new OpenSearchResponse[0])).iterator();
iterator = Collections.emptyIterator();
fetchNextBatch();
}

@Override
public boolean hasNext() {
if (!iterator.hasNext()) {
fetchNextBatch();
}
return iterator.hasNext();
}

Expand All @@ -101,6 +105,13 @@ public ExprValue next() {
return iterator.next();
}

private void fetchNextBatch() {
OpenSearchResponse response = client.search(request);
if (!response.isEmpty()) {
iterator = response.iterator();
}
}

/**
* Push down query to DSL request.
* @param query query request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.sql.opensearch.request.system.OpenSearchDescribeIndexRequest;
import org.opensearch.sql.opensearch.request.system.OpenSearchSystemRequest;
import org.opensearch.sql.planner.DefaultImplementor;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.physical.PhysicalPlan;
Expand All @@ -44,7 +45,7 @@ public Map<String, ExprType> getFieldTypes() {
}

@Override
public PhysicalPlan implement(LogicalPlan plan) {
public PhysicalPlan implement(LogicalPlan plan, PlanContext context) {
return plan.accept(new OpenSearchSystemIndexDefaultImplementor(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.executor.protector.OpenSearchExecutionProtector;
import org.opensearch.sql.opensearch.storage.OpenSearchIndexScan;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.TableScanOperator;

Expand Down Expand Up @@ -124,9 +125,10 @@ public void onFailure(Exception e) {
void explainSuccessfully() {
OpenSearchExecutionEngine executor = new OpenSearchExecutionEngine(client, protector);
Settings settings = mock(Settings.class);
PlanContext context = new PlanContext();
when(settings.getSettingValue(QUERY_SIZE_LIMIT)).thenReturn(100);
PhysicalPlan plan = new OpenSearchIndexScan(mock(OpenSearchClient.class),
settings, "test", mock(OpenSearchExprValueFactory.class));
settings, "test", context, mock(OpenSearchExprValueFactory.class));

AtomicReference<ExplainResponse> result = new AtomicReference<>();
executor.explain(plan, new ResponseListener<ExplainResponse>() {
Expand Down
Loading