Skip to content

Commit

Permalink
Support for nested expression serialization
Browse files Browse the repository at this point in the history
Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>
  • Loading branch information
MaxKsyunz committed Mar 21, 2023
1 parent ec5fb40 commit ba9130a
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import lombok.SneakyThrows;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer;
import org.opensearch.sql.expression.serialization.NoEncodeExpressionSerializer;
import org.opensearch.sql.opensearch.executor.Cursor;
import org.opensearch.sql.planner.physical.PaginateOperator;
import org.opensearch.sql.planner.physical.PhysicalPlan;
Expand All @@ -28,6 +28,7 @@
@RequiredArgsConstructor
public class PaginatedPlanCache {
public static final String CURSOR_PREFIX = "n:";
public static final String UNSUPPORTED_CURSOR = "Unsupported cursor";
private final StorageEngine storageEngine;
public static final PaginatedPlanCache None = new PaginatedPlanCache(null);

Expand Down Expand Up @@ -82,7 +83,7 @@ public static String compress(String str) {
@SneakyThrows
public static String decompress(String input) {
if (input == null || input.length() == 0) {
return null;
return "";
}
GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(
HashCode.fromString(input).asBytes()));
Expand All @@ -96,14 +97,14 @@ public static String decompress(String input) {
* @return Remaining part of the cursor.
*/
private String parseNamedExpressions(List<NamedExpression> listToFill, String cursor) {
var serializer = new DefaultExpressionSerializer();
var serializer = new NoEncodeExpressionSerializer();
if (cursor.startsWith(")")) { //empty list
return cursor.substring(cursor.indexOf(',') + 1);
}
while (!cursor.startsWith("(")) {
listToFill.add((NamedExpression)
serializer.deserialize(cursor.substring(0,
Math.min(cursor.indexOf(','), cursor.indexOf(')')))));
Math.min(cursor.indexOf(','), cursor.indexOf(')'))).getBytes()));
cursor = cursor.substring(cursor.indexOf(',') + 1);
}
return cursor;
Expand All @@ -120,7 +121,7 @@ public PhysicalPlan convertToPlan(String cursor) {

// TODO Parse with ANTLR or serialize as JSON/XML
if (!cursor.startsWith("(Paginate,")) {
throw new UnsupportedOperationException("Unsupported cursor");
throw new UnsupportedOperationException(UNSUPPORTED_CURSOR);
}
// TODO add checks for > 0
cursor = cursor.substring(cursor.indexOf(',') + 1);
Expand All @@ -131,11 +132,11 @@ public PhysicalPlan convertToPlan(String cursor) {

cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(Project,")) {
throw new UnsupportedOperationException("Unsupported cursor");
throw new UnsupportedOperationException(UNSUPPORTED_CURSOR);
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(namedParseExpressions,")) {
throw new UnsupportedOperationException("Unsupported cursor");
throw new UnsupportedOperationException(UNSUPPORTED_CURSOR);
}

cursor = cursor.substring(cursor.indexOf(',') + 1);
Expand All @@ -144,13 +145,13 @@ public PhysicalPlan convertToPlan(String cursor) {

List<NamedExpression> projectList = new ArrayList<>();
if (!cursor.startsWith("(projectList,")) {
throw new UnsupportedOperationException("Unsupported cursor");
throw new UnsupportedOperationException(UNSUPPORTED_CURSOR);
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
cursor = parseNamedExpressions(projectList, cursor);

if (!cursor.startsWith("(OpenSearchPagedIndexScan,")) {
throw new UnsupportedOperationException("Unsupported cursor");
throw new UnsupportedOperationException(UNSUPPORTED_CURSOR);
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
var indexName = cursor.substring(0, cursor.indexOf(','));
Expand All @@ -161,10 +162,10 @@ public PhysicalPlan convertToPlan(String cursor) {
return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions),
pageSize, currentPageIndex);
} catch (Exception e) {
throw new UnsupportedOperationException("Unsupported cursor", e);
throw new UnsupportedOperationException(UNSUPPORTED_CURSOR, e);
}
} else {
throw new UnsupportedOperationException("Unsupported cursor");
throw new UnsupportedOperationException(UNSUPPORTED_CURSOR);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,26 @@

package org.opensearch.sql.expression.serialization;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Base64;
import org.opensearch.sql.expression.Expression;


/**
* Default serializer that (de-)serialize expressions by JDK serialization.
*/
public class DefaultExpressionSerializer implements ExpressionSerializer {

NoEncodeExpressionSerializer noEncodeSerializer = new NoEncodeExpressionSerializer();

@Override
public String serialize(Expression expr) {
try {
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutputStream objectOutput = new ObjectOutputStream(output);
objectOutput.writeObject(expr);
objectOutput.flush();
return Base64.getEncoder().encodeToString(output.toByteArray());
} catch (IOException e) {
throw new IllegalStateException("Failed to serialize expression: " + expr, e);
}
return Base64.getEncoder().encodeToString(noEncodeSerializer.serialize(expr));
}

@Override
public Expression deserialize(String code) {
try {
ByteArrayInputStream input = new ByteArrayInputStream(Base64.getDecoder().decode(code));
ObjectInputStream objectInput = new ObjectInputStream(input);
return (Expression) objectInput.readObject();
return noEncodeSerializer.deserialize(Base64.getDecoder().decode(code));
} catch (Exception e) {
throw new IllegalStateException("Failed to deserialize expression code: " + code, e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.expression.serialization;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.opensearch.sql.expression.Expression;

public class NoEncodeExpressionSerializer {

/**
* Serialize an expression into a byte array.
*/
public byte[] serialize(Expression expr) {
try {
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutputStream objectOutput = new ObjectOutputStream(output);
objectOutput.writeObject(expr);
objectOutput.flush();
return output.toByteArray();
} catch (IOException e) {
throw new IllegalStateException("Failed to serialize expression: " + expr, e);
}
}

/**
* Create an expression from a serialized byte array.
*/
public Expression deserialize(byte[] code) {
try {
ByteArrayInputStream input = new ByteArrayInputStream(code);
ObjectInputStream objectInput = new ObjectInputStream(input);
return (Expression) objectInput.readObject();
} catch (Exception e) {
throw new IllegalStateException("Failed to deserialize expression code: " + code, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.parse.ParseExpression;
import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer;
import org.opensearch.sql.expression.serialization.NoEncodeExpressionSerializer;

/**
* Project the fields specified in {@link ProjectOperator#projectList} from input.
Expand Down Expand Up @@ -102,11 +102,13 @@ public String toCursor() {
if (child == null || child.isEmpty()) {
return null;
}
var serializer = new DefaultExpressionSerializer();
var serializer = new NoEncodeExpressionSerializer();
String projects = createSection("projectList",
projectList.stream().map(serializer::serialize).toArray(String[]::new));
projectList.stream().map(serializer::serialize)
.map(Object::toString).toArray(String[]::new));
String namedExpressions = createSection("namedParseExpressions",
namedParseExpressions.stream().map(serializer::serialize).toArray(String[]::new));
namedParseExpressions.stream().map(serializer::serialize)
.map(Object::toString).toArray(String[]::new));
return createSection("Project", namedExpressions, projects, child);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.expression.serialization;


import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.opensearch.sql.data.type.ExprCoreType.STRING;
import static org.opensearch.sql.expression.DSL.literal;
import static org.opensearch.sql.expression.DSL.ref;

import org.junit.jupiter.api.Test;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.expression.DSL;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.ExpressionNodeVisitor;
import org.opensearch.sql.expression.env.Environment;

class NoEncodeExpressionSerializerTest {

private final NoEncodeExpressionSerializer serializer = new NoEncodeExpressionSerializer();

@Test
void can_serialize_and_deserialize_literals() {
Expression original = literal(10);
Expression actual = serializer.deserialize(serializer.serialize(original));
assertEquals(original, actual);
}

@Test
void can_serialize_and_deserialize_references() {
Expression original = ref("name", STRING);
Expression actual = serializer.deserialize(serializer.serialize(original));
assertEquals(original, actual);
}

@Test
void can_serialize_and_deserialize_predicates() {
Expression original = DSL.or(literal(true), DSL.less(literal(1), literal(2)));
Expression actual = serializer.deserialize(serializer.serialize(original));
assertEquals(original, actual);
}

@Test
void can_serialize_and_deserialize_functions() {
Expression original = DSL.abs(literal(30.0));
Expression actual = serializer.deserialize(serializer.serialize(original));
assertEquals(original, actual);
}

@Test
void cannot_serialize_illegal_expression() {
Expression illegalExpr = new Expression() {
private final Object object = new Object(); // non-serializable
@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
return null;
}

@Override
public ExprType type() {
return null;
}

@Override
public <T, C> T accept(ExpressionNodeVisitor<T, C> visitor, C context) {
return null;
}
};
assertThrows(IllegalStateException.class, () -> serializer.serialize(illegalExpr));
}

@Test
void cannot_deserialize_illegal_expression_code() {
var arr = "hello world".getBytes();
assertThrows(IllegalStateException.class, () -> serializer.deserialize(arr));
}
}

0 comments on commit ba9130a

Please sign in to comment.