diff --git a/core/src/main/java/org/opensearch/sql/analysis/AnalysisContext.java b/core/src/main/java/org/opensearch/sql/analysis/AnalysisContext.java index 2d3ee1a52c..73ae40f41d 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/AnalysisContext.java +++ b/core/src/main/java/org/opensearch/sql/analysis/AnalysisContext.java @@ -11,6 +11,7 @@ import java.util.Objects; import lombok.Getter; import org.opensearch.sql.expression.NamedExpression; +import org.opensearch.sql.planner.PlanContext; /** * The context used for Analyzer. @@ -20,16 +21,35 @@ public class AnalysisContext { * Environment stack for symbol scope management. */ private TypeEnvironment environment; + + /** + * Context for physical plan building. + */ + @Getter + private PlanContext planContext; + @Getter private final List namedParseExpressions; public AnalysisContext() { - this(new TypeEnvironment(null)); + this(new TypeEnvironment(null), new PlanContext()); + } + + public AnalysisContext(PlanContext planContext) { + this (new TypeEnvironment(null), planContext); } public AnalysisContext(TypeEnvironment environment) { + this(environment, new PlanContext()); + } + + /** + * Constructor. + */ + public AnalysisContext(TypeEnvironment environment, PlanContext planContext) { this.environment = environment; this.namedParseExpressions = new ArrayList<>(); + this.planContext = planContext; } /** diff --git a/core/src/main/java/org/opensearch/sql/planner/PlanContext.java b/core/src/main/java/org/opensearch/sql/planner/PlanContext.java new file mode 100644 index 0000000000..a6c30aa77f --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/PlanContext.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner; + +import lombok.Getter; +import lombok.Setter; + +/** + * PlanContext that stores context for physical plan generation. + */ +public class PlanContext { + @Getter + @Setter + private IndexScanType indexScanType; + + public PlanContext() { + this.indexScanType = IndexScanType.QUERY; + } + + public enum IndexScanType { + /** + * default query request. + */ + QUERY, + + /** + * scroll request. + */ + SCROLL + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/Planner.java b/core/src/main/java/org/opensearch/sql/planner/Planner.java index 803b2d1931..ebaed1f0f4 100644 --- a/core/src/main/java/org/opensearch/sql/planner/Planner.java +++ b/core/src/main/java/org/opensearch/sql/planner/Planner.java @@ -36,10 +36,11 @@ public class Planner { * translate logical plan to physical by default implementor. * TODO: for now just delegate entire logical plan to storage engine. * - * @param plan logical plan + * @param plan logical plan + * @param context plan context * @return optimal physical plan */ - public PhysicalPlan plan(LogicalPlan plan) { + public PhysicalPlan plan(LogicalPlan plan, PlanContext context) { String tableName = findTableName(plan); if (isNullOrEmpty(tableName)) { return plan.accept(new DefaultImplementor<>(), null); @@ -47,7 +48,8 @@ public PhysicalPlan plan(LogicalPlan plan) { Table table = storageEngine.getTable(tableName); return table.implement( - table.optimize(optimize(plan))); + table.optimize(optimize(plan)), + context); } private String findTableName(LogicalPlan plan) { diff --git a/core/src/main/java/org/opensearch/sql/storage/Table.java b/core/src/main/java/org/opensearch/sql/storage/Table.java index 731cf878c6..886e54705d 100644 --- a/core/src/main/java/org/opensearch/sql/storage/Table.java +++ b/core/src/main/java/org/opensearch/sql/storage/Table.java @@ -8,6 +8,7 @@ import java.util.Map; import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; @@ -24,10 +25,11 @@ public interface Table { /** * Implement a {@link LogicalPlan} by {@link PhysicalPlan} in storage engine. * - * @param plan logical plan + * @param plan logical plan + * @param context plan context * @return physical plan */ - PhysicalPlan implement(LogicalPlan plan); + PhysicalPlan implement(LogicalPlan plan, PlanContext context); /** * Optimize the {@link LogicalPlan} by storage engine rule. diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java index 09ddca1645..c0ce396503 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java @@ -21,6 +21,7 @@ import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.expression.env.Environment; import org.opensearch.sql.expression.function.BuiltinFunctionRepository; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.StorageEngine; @@ -47,7 +48,7 @@ public Map getFieldTypes() { } @Override - public PhysicalPlan implement(LogicalPlan plan) { + public PhysicalPlan implement(LogicalPlan plan, PlanContext context) { throw new UnsupportedOperationException(); } }; diff --git a/core/src/test/java/org/opensearch/sql/config/TestConfig.java b/core/src/test/java/org/opensearch/sql/config/TestConfig.java index ab78109aa2..1a754dc4d1 100644 --- a/core/src/test/java/org/opensearch/sql/config/TestConfig.java +++ b/core/src/test/java/org/opensearch/sql/config/TestConfig.java @@ -17,6 +17,7 @@ import org.opensearch.sql.expression.Expression; import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.expression.env.Environment; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.StorageEngine; @@ -69,7 +70,7 @@ public Map getFieldTypes() { } @Override - public PhysicalPlan implement(LogicalPlan plan) { + public PhysicalPlan implement(LogicalPlan plan, PlanContext context) { throw new UnsupportedOperationException(); } }; diff --git a/core/src/test/java/org/opensearch/sql/planner/PlannerTest.java b/core/src/test/java/org/opensearch/sql/planner/PlannerTest.java index c34091dbf7..f7fdc00cba 100644 --- a/core/src/test/java/org/opensearch/sql/planner/PlannerTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/PlannerTest.java @@ -84,7 +84,8 @@ public void planner_test() { ImmutableList.of() ), ImmutableMap.of(DSL.ref("ivalue", INTEGER), DSL.ref("avg(response)", DOUBLE)) - ) + ), + new PlanContext() ); } @@ -105,16 +106,19 @@ public void plan_a_query_without_relation_involved() { DSL.named("123", DSL.literal(123)), DSL.named("hello", DSL.literal("hello")), DSL.named("false", DSL.literal(false)) - ) + ), + new PlanContext() ); } - protected void assertPhysicalPlan(PhysicalPlan expected, LogicalPlan logicalPlan) { - assertEquals(expected, analyze(logicalPlan)); + protected void assertPhysicalPlan(PhysicalPlan expected, + LogicalPlan logicalPlan, + PlanContext planContext) { + assertEquals(expected, analyze(logicalPlan, planContext)); } - protected PhysicalPlan analyze(LogicalPlan logicalPlan) { - return new Planner(storageEngine, optimizer).plan(logicalPlan); + protected PhysicalPlan analyze(LogicalPlan logicalPlan, PlanContext planContext) { + return new Planner(storageEngine, optimizer).plan(logicalPlan, planContext); } protected class MockTable extends LogicalPlanNodeVisitor implements Table { @@ -125,7 +129,7 @@ public Map getFieldTypes() { } @Override - public PhysicalPlan implement(LogicalPlan plan) { + public PhysicalPlan implement(LogicalPlan plan, PlanContext context) { return plan.accept(this, null); } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java index 51484feda7..da1308083b 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java @@ -30,6 +30,7 @@ import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.opensearch.security.SecurityAccess; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.protocol.response.QueryResult; import org.opensearch.sql.protocol.response.format.CsvResponseFormatter; @@ -101,9 +102,12 @@ public RestChannelConsumer prepareRequest(SQLQueryRequest request, NodeClient no try { // For now analyzing and planning stage may throw syntax exception as well // which hints the fallback to legacy code is necessary here. + PlanContext context = new PlanContext(); plan = sqlService.plan( - sqlService.analyze( - sqlService.parse(request.getQuery()))); + sqlService.analyze( + sqlService.parse(request.getQuery()), + context + ), context); } catch (SyntaxCheckException e) { // When explain, print info log for what unsupported syntax is causing fallback to old engine if (request.isExplainRequest()) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index 49301cbf53..a54c575f7f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -32,6 +32,7 @@ import org.opensearch.sql.opensearch.storage.script.sort.SortQueryBuilder; import org.opensearch.sql.opensearch.storage.serialization.DefaultExpressionSerializer; import org.opensearch.sql.planner.DefaultImplementor; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.logical.LogicalAD; import org.opensearch.sql.planner.logical.LogicalMLCommons; import org.opensearch.sql.planner.logical.LogicalPlan; @@ -83,8 +84,8 @@ public Map getFieldTypes() { * TODO: Push down operations to index scan operator as much as possible in future. */ @Override - public PhysicalPlan implement(LogicalPlan plan) { - OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, settings, indexName, + public PhysicalPlan implement(LogicalPlan plan, PlanContext context) { + OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, settings, indexName, context, new OpenSearchExprValueFactory(getFieldTypes())); /* diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java index c35a5ba9db..6e9d795282 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java @@ -9,8 +9,7 @@ import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; import static org.opensearch.search.sort.SortOrder.ASC; -import com.google.common.collect.Iterables; -import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -34,8 +33,10 @@ import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.request.OpenSearchQueryRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequest; +import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.storage.TableScanOperator; /** @@ -62,37 +63,40 @@ public class OpenSearchIndexScan extends TableScanOperator { */ public OpenSearchIndexScan(OpenSearchClient client, Settings settings, String indexName, - OpenSearchExprValueFactory exprValueFactory) { - this(client, settings, new OpenSearchRequest.IndexName(indexName), exprValueFactory); + PlanContext context, OpenSearchExprValueFactory exprValueFactory) { + this(client, settings, new OpenSearchRequest.IndexName(indexName), context, exprValueFactory); } /** * Constructor. */ public OpenSearchIndexScan(OpenSearchClient client, - Settings settings, OpenSearchRequest.IndexName indexName, - OpenSearchExprValueFactory exprValueFactory) { + Settings settings, OpenSearchRequest.IndexName indexName, + PlanContext context, OpenSearchExprValueFactory exprValueFactory) { this.client = client; - this.request = new OpenSearchQueryRequest(indexName, - settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT), exprValueFactory); + switch (context.getIndexScanType()) { + case SCROLL: + this.request = new OpenSearchScrollRequest(indexName, exprValueFactory); + break; + case QUERY: + default: + this.request = new OpenSearchQueryRequest(indexName, + settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT), exprValueFactory); + } } @Override public void open() { super.open(); - - // For now pull all results immediately once open - List responses = new ArrayList<>(); - OpenSearchResponse response = client.search(request); - while (!response.isEmpty()) { - responses.add(response); - response = client.search(request); - } - iterator = Iterables.concat(responses.toArray(new OpenSearchResponse[0])).iterator(); + iterator = Collections.emptyIterator(); + fetchNextBatch(); } @Override public boolean hasNext() { + if (!iterator.hasNext()) { + fetchNextBatch(); + } return iterator.hasNext(); } @@ -101,6 +105,13 @@ public ExprValue next() { return iterator.next(); } + private void fetchNextBatch() { + OpenSearchResponse response = client.search(request); + if (!response.isEmpty()) { + iterator = response.iterator(); + } + } + /** * Push down query to DSL request. * @param query query request diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndex.java index edd5593f4d..42c662ed6a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndex.java @@ -18,6 +18,7 @@ import org.opensearch.sql.opensearch.request.system.OpenSearchDescribeIndexRequest; import org.opensearch.sql.opensearch.request.system.OpenSearchSystemRequest; import org.opensearch.sql.planner.DefaultImplementor; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalRelation; import org.opensearch.sql.planner.physical.PhysicalPlan; @@ -44,7 +45,7 @@ public Map getFieldTypes() { } @Override - public PhysicalPlan implement(LogicalPlan plan) { + public PhysicalPlan implement(LogicalPlan plan, PlanContext context) { return plan.accept(new OpenSearchSystemIndexDefaultImplementor(), null); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java index 24c305a75e..909d29f58c 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java @@ -40,6 +40,7 @@ import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.executor.protector.OpenSearchExecutionProtector; import org.opensearch.sql.opensearch.storage.OpenSearchIndexScan; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.TableScanOperator; @@ -124,9 +125,10 @@ public void onFailure(Exception e) { void explainSuccessfully() { OpenSearchExecutionEngine executor = new OpenSearchExecutionEngine(client, protector); Settings settings = mock(Settings.class); + PlanContext context = new PlanContext(); when(settings.getSettingValue(QUERY_SIZE_LIMIT)).thenReturn(100); PhysicalPlan plan = new OpenSearchIndexScan(mock(OpenSearchClient.class), - settings, "test", mock(OpenSearchExprValueFactory.class)); + settings, "test", context, mock(OpenSearchExprValueFactory.class)); AtomicReference result = new AtomicReference<>(); executor.explain(plan, new ResponseListener() { diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java index 5bffa1cfa8..80ba51c1de 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java @@ -36,7 +36,6 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.client.node.NodeClient; -import org.opensearch.sql.ast.dsl.AstDSL; import org.opensearch.sql.ast.expression.DataType; import org.opensearch.sql.ast.expression.Literal; import org.opensearch.sql.ast.tree.RareTopN.CommandType; @@ -59,6 +58,7 @@ import org.opensearch.sql.opensearch.planner.physical.MLCommonsOperator; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.opensearch.storage.OpenSearchIndexScan; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlanDSL; @@ -77,11 +77,14 @@ class OpenSearchExecutionProtectorTest { @Mock private OpenSearchSettings settings; + private PlanContext context; + private OpenSearchExecutionProtector executionProtector; @BeforeEach public void setup() { executionProtector = new OpenSearchExecutionProtector(resourceMonitor); + context = new PlanContext(); } @Test @@ -124,7 +127,7 @@ public void testProtectIndexScan() { filter( resourceMonitor( new OpenSearchIndexScan( - client, settings, indexName, + client, settings, indexName, context, exprValueFactory)), filterExpr), aggregators, @@ -152,7 +155,7 @@ public void testProtectIndexScan() { PhysicalPlanDSL.agg( filter( new OpenSearchIndexScan( - client, settings, indexName, + client, settings, indexName, context, exprValueFactory), filterExpr), aggregators, diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScanTest.java index 429c639da9..18cb632fa2 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScanTest.java @@ -39,6 +39,7 @@ import org.opensearch.sql.opensearch.request.OpenSearchQueryRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; +import org.opensearch.sql.planner.PlanContext; @ExtendWith(MockitoExtension.class) class OpenSearchIndexScanTest { @@ -49,6 +50,8 @@ class OpenSearchIndexScanTest { @Mock private Settings settings; + private PlanContext context = new PlanContext(); + private OpenSearchExprValueFactory exprValueFactory = new OpenSearchExprValueFactory( ImmutableMap.of("name", STRING, "department", STRING)); @@ -61,7 +64,7 @@ void setup() { void queryEmptyResult() { mockResponse(); try (OpenSearchIndexScan indexScan = - new OpenSearchIndexScan(client, settings, "test", exprValueFactory)) { + new OpenSearchIndexScan(client, settings, "test", context, exprValueFactory)) { indexScan.open(); assertFalse(indexScan.hasNext()); } @@ -75,7 +78,7 @@ void queryAllResults() { new ExprValue[]{employee(3, "Allen", "IT")}); try (OpenSearchIndexScan indexScan = - new OpenSearchIndexScan(client, settings, "employees", exprValueFactory)) { + new OpenSearchIndexScan(client, settings, "employees", context, exprValueFactory)) { indexScan.open(); assertTrue(indexScan.hasNext()); @@ -111,7 +114,7 @@ void pushDownFilters() { } private PushDownAssertion assertThat() { - return new PushDownAssertion(client, exprValueFactory, settings); + return new PushDownAssertion(client, exprValueFactory, settings, context); } private static class PushDownAssertion { @@ -122,9 +125,10 @@ private static class PushDownAssertion { public PushDownAssertion(OpenSearchClient client, OpenSearchExprValueFactory valueFactory, - Settings settings) { + Settings settings, + PlanContext context) { this.client = client; - this.indexScan = new OpenSearchIndexScan(client, settings, "test", valueFactory); + this.indexScan = new OpenSearchIndexScan(client, settings, "test", context, valueFactory); this.response = mock(OpenSearchResponse.class); this.factory = valueFactory; when(response.isEmpty()).thenReturn(true); @@ -160,10 +164,8 @@ public OpenSearchResponse answer(InvocationOnMock invocation) { when(response.isEmpty()).thenReturn(false); ExprValue[] searchHit = searchHitBatches[batchNum]; when(response.iterator()).thenReturn(Arrays.asList(searchHit).iterator()); - } else if (batchNum == totalBatch) { - when(response.isEmpty()).thenReturn(true); } else { - fail("Search request after empty response returned already"); + when(response.isEmpty()).thenReturn(true); } batchNum++; diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java index 847ac8dfc0..8c9ff17f62 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java @@ -62,6 +62,7 @@ import org.opensearch.sql.opensearch.data.type.OpenSearchDataType; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.mapping.IndexMapping; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalPlanDSL; import org.opensearch.sql.planner.physical.AggregationOperator; @@ -137,10 +138,11 @@ void implementRelationOperatorOnly() { String indexName = "test"; LogicalPlan plan = relation(indexName); + PlanContext context = new PlanContext(); Table index = new OpenSearchIndex(client, settings, indexName); assertEquals( - new OpenSearchIndexScan(client, settings, indexName, exprValueFactory), - index.implement(plan)); + new OpenSearchIndexScan(client, settings, indexName, context, exprValueFactory), + index.implement(plan, context)); } @Test @@ -149,10 +151,11 @@ void implementRelationOperatorWithOptimization() { String indexName = "test"; LogicalPlan plan = relation(indexName); + PlanContext context = new PlanContext(); Table index = new OpenSearchIndex(client, settings, indexName); assertEquals( - new OpenSearchIndexScan(client, settings, indexName, exprValueFactory), - index.implement(index.optimize(plan))); + new OpenSearchIndexScan(client, settings, indexName, context, exprValueFactory), + index.implement(index.optimize(plan), context)); } @Test @@ -191,6 +194,7 @@ void implementOtherLogicalOperators() { dedupeField), include); + PlanContext context = new PlanContext(); Table index = new OpenSearchIndex(client, settings, indexName); assertEquals( PhysicalPlanDSL.project( @@ -199,7 +203,7 @@ void implementOtherLogicalOperators() { PhysicalPlanDSL.eval( PhysicalPlanDSL.remove( PhysicalPlanDSL.rename( - new OpenSearchIndexScan(client, settings, indexName, + new OpenSearchIndexScan(client, settings, indexName, context, exprValueFactory), mappings), exclude), @@ -207,7 +211,7 @@ void implementOtherLogicalOperators() { sortField), dedupeField), include), - index.implement(plan)); + index.implement(plan, context)); } @Test @@ -226,7 +230,8 @@ void shouldImplLogicalIndexScan() { indexName, filterExpr ), - named)); + named), + new PlanContext()); assertTrue(plan instanceof ProjectOperator); assertTrue(((ProjectOperator) plan).getInput() instanceof OpenSearchIndexScan); @@ -252,7 +257,8 @@ void shouldNotPushDownFilterFarFromRelation() { aggregators, groupByExprs ), - filterExpr)); + filterExpr), + new PlanContext()); assertTrue(plan instanceof FilterOperator); } @@ -279,7 +285,8 @@ void shouldImplLogicalIndexScanAgg() { aggregators, groupByExprs ), - filterExpr)); + filterExpr), + new PlanContext()); assertTrue(plan.getChild().get(0) instanceof OpenSearchIndexScan); @@ -289,7 +296,8 @@ void shouldImplLogicalIndexScanAgg() { indexName, filterExpr, aggregators, - groupByExprs)); + groupByExprs), + new PlanContext()); assertTrue(plan instanceof OpenSearchIndexScan); } @@ -313,7 +321,8 @@ void shouldNotPushDownAggregationFarFromRelation() { relation(indexName), filterExpr), filterExpr), aggregators, - groupByExprs)); + groupByExprs), + new PlanContext()); assertTrue(plan instanceof AggregationOperator); } @@ -333,7 +342,8 @@ void shouldImplIndexScanWithSort() { indexName, Pair.of(Sort.SortOption.DEFAULT_ASC, sortExpr) ), - named)); + named), + new PlanContext()); assertTrue(plan instanceof ProjectOperator); assertTrue(((ProjectOperator) plan).getInput() instanceof OpenSearchIndexScan); @@ -354,7 +364,8 @@ void shouldImplIndexScanWithLimit() { indexName, 1, 1, noProjects() ), - named)); + named), + new PlanContext()); assertTrue(plan instanceof ProjectOperator); assertTrue(((ProjectOperator) plan).getInput() instanceof OpenSearchIndexScan); @@ -378,7 +389,8 @@ void shouldImplIndexScanWithSortAndLimit() { 1, 1, noProjects() ), - named)); + named), + new PlanContext()); assertTrue(plan instanceof ProjectOperator); assertTrue(((ProjectOperator) plan).getInput() instanceof OpenSearchIndexScan); @@ -402,7 +414,7 @@ void shouldNotPushDownLimitFarFromRelationButUpdateScanSize() { ), named("intV", ref("intV", INTEGER)) ) - )); + ), new PlanContext()); assertTrue(plan instanceof ProjectOperator); assertTrue(((ProjectOperator) plan).getInput() instanceof LimitOperator); @@ -419,7 +431,8 @@ void shouldPushDownProjects() { indexScan( indexName, projects(ref("intV", INTEGER)) ), - named("i", ref("intV", INTEGER)))); + named("i", ref("intV", INTEGER))), + new PlanContext()); assertTrue(plan instanceof ProjectOperator); assertTrue(((ProjectOperator) plan).getInput() instanceof OpenSearchIndexScan); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndexTest.java index 685d3e33af..3cb7ce0d36 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndexTest.java @@ -26,6 +26,7 @@ import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.expression.NamedExpression; import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.ProjectOperator; @@ -58,12 +59,13 @@ void testGetFieldTypesOfMappingTable() { void implement() { OpenSearchSystemIndex systemIndex = new OpenSearchSystemIndex(client, TABLE_INFO); NamedExpression projectExpr = named("TABLE_NAME", ref("TABLE_NAME", STRING)); + PlanContext planContext = new PlanContext(); final PhysicalPlan plan = systemIndex.implement( project( relation(TABLE_INFO), projectExpr - )); + ), planContext); assertTrue(plan instanceof ProjectOperator); assertTrue(plan.getChild().get(0) instanceof OpenSearchSystemIndexScan); } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java index a1a831c7cd..0a7c04c276 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java @@ -21,6 +21,7 @@ import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.function.BuiltinFunctionRepository; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.Planner; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; @@ -86,13 +87,15 @@ private PhysicalPlan plan(PPLQueryRequest request) { LOG.info("[{}] Incoming request {}", LogUtils.getRequestId(), anonymizer.anonymizeData(ast)); + PlanContext planContext = new PlanContext(); + // 2.Analyze abstract syntax to generate logical plan LogicalPlan logicalPlan = analyzer.analyze(UnresolvedPlanHelper.addSelectAll(ast), - new AnalysisContext()); + new AnalysisContext(planContext)); // 3.Generate optimal physical plan from logical plan return new Planner(storageEngine, LogicalPlanOptimizer.create(new DSL(repository))) - .plan(logicalPlan); + .plan(logicalPlan, planContext); } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java index 7f28aeee40..d114cf3b5d 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java @@ -24,6 +24,7 @@ import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; import org.opensearch.sql.executor.ExecutionEngine.ExplainResponseNode; import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.ppl.config.PPLServiceConfig; import org.opensearch.sql.ppl.domain.PPLQueryRequest; @@ -58,7 +59,7 @@ public class PPLServiceTest { @Before public void setUp() { when(table.getFieldTypes()).thenReturn(ImmutableMap.of("a", ExprCoreType.INTEGER)); - when(table.implement(any())).thenReturn(plan); + when(table.implement(any(), any())).thenReturn(plan); when(storageEngine.getTable(any())).thenReturn(table); context.registerBean(StorageEngine.class, () -> storageEngine); diff --git a/sql/src/main/java/org/opensearch/sql/sql/SQLService.java b/sql/src/main/java/org/opensearch/sql/sql/SQLService.java index 991e9df12a..f367f14a0d 100644 --- a/sql/src/main/java/org/opensearch/sql/sql/SQLService.java +++ b/sql/src/main/java/org/opensearch/sql/sql/SQLService.java @@ -17,6 +17,7 @@ import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.function.BuiltinFunctionRepository; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.Planner; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; @@ -49,10 +50,15 @@ public class SQLService { */ public void execute(SQLQueryRequest request, ResponseListener listener) { try { + PlanContext context = new PlanContext(); executionEngine.execute( plan( analyze( - parse(request.getQuery()))), listener); + parse(request.getQuery()), + context + ), + context + ), listener); } catch (Exception e) { listener.onFailure(e); } @@ -95,16 +101,16 @@ public UnresolvedPlan parse(String query) { /** * Analyze abstract syntax to generate logical plan. */ - public LogicalPlan analyze(UnresolvedPlan ast) { - return analyzer.analyze(ast, new AnalysisContext()); + public LogicalPlan analyze(UnresolvedPlan ast, PlanContext planContext) { + return analyzer.analyze(ast, new AnalysisContext(planContext)); } /** * Generate optimal physical plan from logical plan. */ - public PhysicalPlan plan(LogicalPlan logicalPlan) { + public PhysicalPlan plan(LogicalPlan logicalPlan, PlanContext planContext) { return new Planner(storageEngine, LogicalPlanOptimizer.create(new DSL(repository))) - .plan(logicalPlan); + .plan(logicalPlan, planContext); } }