Skip to content

Commit

Permalink
Address PR comments.
Browse files Browse the repository at this point in the history
* Add javadocs
* Renames
* Cleaning up some comments
* Remove unused code
* Speed up IT

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
  • Loading branch information
Yury-Fridlyand committed Mar 24, 2023
1 parent ec5fb40 commit 4213388
Show file tree
Hide file tree
Showing 37 changed files with 243 additions and 264 deletions.
5 changes: 4 additions & 1 deletion core/src/main/java/org/opensearch/sql/ast/tree/Paginate.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;

/**
* AST node to represent pagination operation.
* Actually a wrapper to the AST.
*/
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
@ToString
Expand All @@ -38,7 +42,6 @@ public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
assert this.child == null;
this.child = child;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@

package org.opensearch.sql.exception;

/**
* This should be thrown by V2 engine to support fallback scenario.
*/
public class UnsupportedCursorRequestException extends RuntimeException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.sql.executor;

import java.util.concurrent.atomic.AtomicBoolean;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.ast.expression.AllFields;
Expand All @@ -15,13 +14,14 @@
/**
* Use this unresolved plan visitor to check if a plan can be serialized by PaginatedPlanCache.
* If plan.accept(new CanPaginateVisitor(...)) returns true,
* then PaginatedPlanCache.convertToCursor will succeed.
* Otherwise, it will fail.
* then PaginatedPlanCache.convertToCursor will succeed. Otherwise, it will fail.
* The purpose of this visitor is to activate legacy engine fallback mechanism.
* Currently, the conditions are:
* - only projection of a relation is supported.
* - projection only has * (a.k.a. allFields).
* - Relation only scans one table
* - The table is an open search index.
* So it accepts only queries like `select * from $index`
* See PaginatedPlanCache.canConvertToCursor for usage.
*/
public class CanPaginateVisitor extends AbstractNodeVisitor<Boolean, Object> {
Expand All @@ -39,42 +39,6 @@ public Boolean visitRelation(Relation node, Object context) {
return Boolean.TRUE;
}

/*
private Boolean canPaginate(Node node, Object context) {
AtomicBoolean result = new AtomicBoolean(true);
node.getChild().forEach(n -> result.set(result.get() && n.accept(this, context)));
return result.get();
}
For queries without `FROM` clause.
Required to overload `toCursor` function in `ValuesOperator` and modify cursor parsing.
@Override
public Boolean visitValues(Values node, Object context) {
return canPaginate(node, context);
}
For queries with LIMIT clause:
Required to overload `toCursor` function in `LimitOperator` and modify cursor parsing.
@Override
public Boolean visitLimit(Limit node, Object context) {
return canPaginate(node, context);
}
For queries with ORDER BY clause:
Required to overload `toCursor` function in `SortOperator` and modify cursor parsing.
@Override
public Boolean visitSort(Sort node, Object context) {
return canPaginate(node, context);
}
For queries with WHERE clause:
Required to overload `toCursor` function in `FilterOperator` and modify cursor parsing.
@Override
public Boolean visitFilter(Filter node, Object context) {
return canPaginate(node, context);
}
*/

@Override
public Boolean visitChildren(Node node, Object context) {
return Boolean.FALSE;
Expand Down
134 changes: 63 additions & 71 deletions core/src/main/java/org/opensearch/sql/executor/PaginatedPlanCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
import com.google.common.hash.HashCode;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer;
Expand All @@ -25,47 +24,42 @@
import org.opensearch.sql.storage.StorageEngine;
import org.opensearch.sql.storage.TableScanOperator;

/**
* This class is entry point to paged requests. It is responsible to cursor serialization
* and deserialization.
*/
@RequiredArgsConstructor
public class PaginatedPlanCache {
public static final String CURSOR_PREFIX = "n:";
private final StorageEngine storageEngine;
public static final PaginatedPlanCache None = new PaginatedPlanCache(null);

public boolean canConvertToCursor(UnresolvedPlan plan) {
return plan.accept(new CanPaginateVisitor(), null);
}

@RequiredArgsConstructor
@Data
static class SerializationContext {
private final PaginatedPlanCache cache;
}

/**
* Converts a physical plan tree to a cursor. May cache plan related data somewhere.
*/
public Cursor convertToCursor(PhysicalPlan plan) {
public Cursor convertToCursor(PhysicalPlan plan) throws IOException {
if (plan instanceof PaginateOperator) {
var cursor = plan.toCursor();
if (cursor == null) {
return Cursor.None;
}
var raw = CURSOR_PREFIX + compress(cursor);
return new Cursor(raw.getBytes());
} else {
return Cursor.None;
}
return Cursor.None;
}

/**
* Compress serialized query plan.
* @param str string representing a query plan
* @return str compressed with gzip.
*/
@SneakyThrows
public static String compress(String str) {
String compress(String str) throws IOException {
if (str == null || str.length() == 0) {
return null;
return "";
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
Expand All @@ -75,14 +69,13 @@ public static String compress(String str) {
}

/**
* Decompresses a query plan that was compress with {@link PaginatedPlanCache.compress}.
* Decompresses a query plan that was compress with {@link PaginatedPlanCache#compress}.
* @param input compressed query plan
* @return seria
* @return decompressed string
*/
@SneakyThrows
public static String decompress(String input) {
String decompress(String input) throws IOException {
if (input == null || input.length() == 0) {
return null;
return "";
}
GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(
HashCode.fromString(input).asBytes()));
Expand Down Expand Up @@ -113,58 +106,57 @@ private String parseNamedExpressions(List<NamedExpression> listToFill, String cu
* Converts a cursor to a physical plan tree.
*/
public PhysicalPlan convertToPlan(String cursor) {
if (cursor.startsWith(CURSOR_PREFIX)) {
try {
cursor = cursor.substring(CURSOR_PREFIX.length());
cursor = decompress(cursor);

// TODO Parse with ANTLR or serialize as JSON/XML
if (!cursor.startsWith("(Paginate,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
// TODO add checks for > 0
cursor = cursor.substring(cursor.indexOf(',') + 1);
final int currentPageIndex = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10);

cursor = cursor.substring(cursor.indexOf(',') + 1);
final int pageSize = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10);

cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(Project,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(namedParseExpressions,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}

cursor = cursor.substring(cursor.indexOf(',') + 1);
List<NamedExpression> namedParseExpressions = new ArrayList<>();
cursor = parseNamedExpressions(namedParseExpressions, cursor);

List<NamedExpression> projectList = new ArrayList<>();
if (!cursor.startsWith("(projectList,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
cursor = parseNamedExpressions(projectList, cursor);

if (!cursor.startsWith("(OpenSearchPagedIndexScan,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
var indexName = cursor.substring(0, cursor.indexOf(','));
cursor = cursor.substring(cursor.indexOf(',') + 1);
var scrollId = cursor.substring(0, cursor.indexOf(')'));
TableScanOperator scan = storageEngine.getTableScan(indexName, scrollId);

return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions),
pageSize, currentPageIndex);
} catch (Exception e) {
throw new UnsupportedOperationException("Unsupported cursor", e);
}
} else {
if (!cursor.startsWith(CURSOR_PREFIX)) {
throw new UnsupportedOperationException("Unsupported cursor");
}
try {
cursor = cursor.substring(CURSOR_PREFIX.length());
cursor = decompress(cursor);

// TODO Parse with ANTLR or serialize as JSON/XML
if (!cursor.startsWith("(Paginate,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
// TODO add checks for > 0
cursor = cursor.substring(cursor.indexOf(',') + 1);
final int currentPageIndex = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10);

cursor = cursor.substring(cursor.indexOf(',') + 1);
final int pageSize = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10);

cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(Project,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(namedParseExpressions,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}

cursor = cursor.substring(cursor.indexOf(',') + 1);
List<NamedExpression> namedParseExpressions = new ArrayList<>();
cursor = parseNamedExpressions(namedParseExpressions, cursor);

List<NamedExpression> projectList = new ArrayList<>();
if (!cursor.startsWith("(projectList,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
cursor = parseNamedExpressions(projectList, cursor);

if (!cursor.startsWith("(OpenSearchPagedIndexScan,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
var indexName = cursor.substring(0, cursor.indexOf(','));
cursor = cursor.substring(cursor.indexOf(',') + 1);
var scrollId = cursor.substring(0, cursor.indexOf(')'));
TableScanOperator scan = storageEngine.getTableScan(indexName, scrollId);

return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions),
pageSize, currentPageIndex);
} catch (Exception e) {
throw new UnsupportedOperationException("Unsupported cursor", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
* ContinuePaginatedPlan represents cursor a request.
* It returns subsequent pages to the user (2nd page and all next).
* {@link PaginatedPlan}
*/
public class ContinuePaginatedPlan extends AbstractPlan {

public static final ContinuePaginatedPlan None
= new ContinuePaginatedPlan(QueryId.None, "", null,
null, null);
private final String cursor;
private final PaginatedQueryService queryService;
private final PaginatedPlanCache paginatedPlanCache;
Expand Down Expand Up @@ -48,7 +50,6 @@ public void execute() {
}

@Override
// TODO why can't use listener given in the constructor?
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
listener.onFailure(new UnsupportedOperationException(
"Explain of a paged query continuation is not supported. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;

/**
* PaginatedPlan represents a page request. Dislike a regular QueryPlan,
* it returns paged response to the user and cursor, which allows to query
* next page.
* {@link ContinuePaginatedPlan}
*/
public class PaginatedPlan extends AbstractPlan {
final UnresolvedPlan plan;
final int fetchSize;
Expand Down Expand Up @@ -39,7 +45,6 @@ public void execute() {
}

@Override
// TODO why can't use listener given in the constructor?
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
listener.onFailure(new NotImplementedException(
"`explain` feature for paginated requests is not implemented yet."));
Expand Down
Loading

0 comments on commit 4213388

Please sign in to comment.