Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework on serialization and deserialization in pagination #1498

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.exception;

/**
* This should be thrown on serialization of a PhysicalPlan tree if paging is finished.
* Processing of such exception should outcome of responding no cursor to the user.
*/
public class NoCursorException extends RuntimeException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,17 @@

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

@EqualsAndHashCode
@RequiredArgsConstructor
public class Cursor {
public static final Cursor None = new Cursor();
public static final Cursor None = new Cursor(null);

@Getter
private final byte[] raw;

private Cursor() {
raw = new byte[] {};
}

public Cursor(byte[] raw) {
this.raw = raw;
}
private final String data;

public String toString() {
return new String(raw);
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.io.InputStream;
import java.io.NotSerializableException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.zip.Deflater;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer;
import org.opensearch.sql.planner.physical.PaginateOperator;
import org.opensearch.sql.exception.NoCursorException;
import org.opensearch.sql.planner.SerializablePlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.ProjectOperator;
import org.opensearch.sql.storage.StorageEngine;
import org.opensearch.sql.storage.TableScanOperator;

/**
* This class is entry point to paged requests. It is responsible to cursor serialization
Expand All @@ -30,132 +31,101 @@
@RequiredArgsConstructor
public class PlanSerializer {
public static final String CURSOR_PREFIX = "n:";
private final StorageEngine storageEngine;

private final StorageEngine engine;

public boolean canConvertToCursor(UnresolvedPlan plan) {
return plan.accept(new CanPaginateVisitor(), null);
}

/**
* Converts a physical plan tree to a cursor. May cache plan related data somewhere.
* Converts a physical plan tree to a cursor.
*/
public Cursor convertToCursor(PhysicalPlan plan) throws IOException {
if (plan instanceof PaginateOperator) {
var cursor = plan.toCursor();
if (cursor == null) {
return Cursor.None;
}
var raw = CURSOR_PREFIX + compress(cursor);
return new Cursor(raw.getBytes());
public Cursor convertToCursor(PhysicalPlan plan) {
try {
return new Cursor(CURSOR_PREFIX
+ serialize(((SerializablePlan) plan).getPlanForSerialization()));
// ClassCastException thrown when a plan in the tree doesn't implement SerializablePlan
} catch (NotSerializableException | ClassCastException | NoCursorException e) {
return Cursor.None;
}
return Cursor.None;
}

/**
* Compress serialized query plan.
* @param str string representing a query plan
* @return str compressed with gzip.
* Serializes and compresses the object.
* @param object The object.
* @return Encoded binary data.
*/
String compress(String str) throws IOException {
if (str == null || str.length() == 0) {
return "";
protected String serialize(Serializable object) throws NotSerializableException {
try {
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutputStream objectOutput = new ObjectOutputStream(output);
objectOutput.writeObject(object);
objectOutput.flush();

ByteArrayOutputStream out = new ByteArrayOutputStream();
// GZIP provides 35-45%, lzma from apache commons-compress has few % better compression
GZIPOutputStream gzip = new GZIPOutputStream(out) { {
this.def.setLevel(Deflater.BEST_COMPRESSION);
} };
gzip.write(output.toByteArray());
gzip.close();

return HashCode.fromBytes(out.toByteArray()).toString();
} catch (NotSerializableException e) {
throw e;
} catch (IOException e) {
throw new IllegalStateException("Failed to serialize: " + object, e);
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(str.getBytes());
gzip.close();
return HashCode.fromBytes(out.toByteArray()).toString();
}

/**
* Decompresses a query plan that was compress with {@link PlanSerializer#compress}.
* @param input compressed query plan
* @return decompressed string
* Decompresses and deserializes the binary data.
* @param code Encoded binary data.
* @return An object.
*/
String decompress(String input) throws IOException {
if (input == null || input.length() == 0) {
return "";
protected Serializable deserialize(String code) {
try {
GZIPInputStream gzip = new GZIPInputStream(
new ByteArrayInputStream(HashCode.fromString(code).asBytes()));
ObjectInputStream objectInput = new CursorDeserializationStream(
new ByteArrayInputStream(gzip.readAllBytes()));
return (Serializable) objectInput.readObject();
} catch (Exception e) {
throw new IllegalStateException("Failed to deserialize object", e);
}
GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(
HashCode.fromString(input).asBytes()));
return new String(gzip.readAllBytes());
}

/**
* Parse `NamedExpression`s from cursor.
* @param listToFill List to fill with data.
* @param cursor Cursor to parse.
* @return Remaining part of the cursor.
* Converts a cursor to a physical plan tree.
*/
private String parseNamedExpressions(List<NamedExpression> listToFill, String cursor) {
var serializer = new DefaultExpressionSerializer();
if (cursor.startsWith(")")) { //empty list
return cursor.substring(cursor.indexOf(',') + 1);
}
while (!cursor.startsWith("(")) {
listToFill.add((NamedExpression)
serializer.deserialize(cursor.substring(0,
Math.min(cursor.indexOf(','), cursor.indexOf(')')))));
cursor = cursor.substring(cursor.indexOf(',') + 1);
}
return cursor;
}

/**
* Converts a cursor to a physical plan tree.
*/
public PhysicalPlan convertToPlan(String cursor) {
if (!cursor.startsWith(CURSOR_PREFIX)) {
throw new UnsupportedOperationException("Unsupported cursor");
}
try {
cursor = cursor.substring(CURSOR_PREFIX.length());
cursor = decompress(cursor);

// TODO Parse with ANTLR or serialize as JSON/XML
if (!cursor.startsWith("(Paginate,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
// TODO add checks for > 0
cursor = cursor.substring(cursor.indexOf(',') + 1);
final int currentPageIndex = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10);

cursor = cursor.substring(cursor.indexOf(',') + 1);
final int pageSize = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10);

cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(Project,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(namedParseExpressions,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}

cursor = cursor.substring(cursor.indexOf(',') + 1);
List<NamedExpression> namedParseExpressions = new ArrayList<>();
cursor = parseNamedExpressions(namedParseExpressions, cursor);
return (PhysicalPlan) deserialize(cursor.substring(CURSOR_PREFIX.length()));
} catch (Exception e) {
throw new UnsupportedOperationException("Unsupported cursor", e);
}
}

List<NamedExpression> projectList = new ArrayList<>();
if (!cursor.startsWith("(projectList,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
cursor = parseNamedExpressions(projectList, cursor);
/**
* This function is used in testing only, to get access to {@link CursorDeserializationStream}.
*/
public CursorDeserializationStream getCursorDeserializationStream(InputStream in)
throws IOException {
return new CursorDeserializationStream(in);
}

if (!cursor.startsWith("(OpenSearchPagedIndexScan,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
var indexName = cursor.substring(0, cursor.indexOf(','));
cursor = cursor.substring(cursor.indexOf(',') + 1);
var scrollId = cursor.substring(0, cursor.indexOf(')'));
TableScanOperator scan = storageEngine.getTableScan(indexName, scrollId);
public class CursorDeserializationStream extends ObjectInputStream {
public CursorDeserializationStream(InputStream in) throws IOException {
super(in);
}

return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions),
pageSize, currentPageIndex);
} catch (Exception e) {
throw new UnsupportedOperationException("Unsupported cursor", e);
@Override
public Object resolveObject(Object obj) throws IOException {
return obj.equals("engine") ? engine : obj;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.apache.commons.lang3.NotImplementedException;
import org.opensearch.sql.executor.pagination.PlanSerializer;

/**
* All subtypes of PhysicalPlan which needs to be serialized (in cursor, for pagination feature)
* should follow one of the following options.
* <ul>
* <li>Both:
* <ul>
* <li>Override both methods from {@link Externalizable}.</li>
* <li>Define a public no-arg constructor.</li>
* </ul>
* </li>
* <li>
* Overwrite {@link #getPlanForSerialization} to return
* another instance of {@link SerializablePlan}.
* </li>
* </ul>
*/
public interface SerializablePlan extends Externalizable {

/**
* Argument is an instance of {@link PlanSerializer.CursorDeserializationStream}.
*/
@Override
default void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
throw new NotImplementedException(String.format("`readExternal` is not implemented in %s",
getClass().getSimpleName()));
}

/**
* Each plan which has as a child plan should do.
* <pre>{@code
* out.writeObject(input.getPlanForSerialization());
* }</pre>
*/
@Override
default void writeExternal(ObjectOutput out) throws IOException {
throw new NotImplementedException(String.format("`readExternal` is not implemented in %s",
getClass().getSimpleName()));
}

/**
* Override to return child or delegated plan, so parent plan should skip this one
* for serialization, but it should try to serialize grandchild plan.
* Imagine plan structure like this
* <pre>
* A -> this
* `- B -> child
* `- C -> this
* </pre>
* In that case only plans A and C should be attempted to serialize.
* It is needed to skip a `ResourceMonitorPlan` instance only, actually.
* @return Next plan for serialization.
*/
default SerializablePlan getPlanForSerialization() {
return this;
}
}
Loading