From 952b5cd242d90fa8ddcccea81c9570bbbb085b8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8D=A2=E6=98=A5=E4=BA=AE?= <946240095@qq.com> Date: Tue, 7 May 2024 17:23:48 +0800 Subject: [PATCH] [INLONG-10129][SDK] Transform SQL support +-*/ operations --- .../transform/process/TransformProcessor.java | 13 +++- .../process/operator/EqualsToOperator.java | 5 +- .../operator/GreaterThanEqualsOperator.java | 3 +- .../process/operator/GreaterThanOperator.java | 3 +- .../operator/MinorThanEqualsOperator.java | 3 +- .../process/operator/MinorThanOperator.java | 3 +- .../process/operator/NotEqualsToOperator.java | 5 +- .../process/operator/OperatorTools.java | 70 ++++++++++++++++++- .../process/parser/AdditionParser.java | 56 +++++++++++++++ .../process/parser/DivisionParser.java | 56 +++++++++++++++ .../process/parser/MultiplicationParser.java | 56 +++++++++++++++ .../process/parser/ParenthesisParser.java | 47 +++++++++++++ .../process/parser/SubtractionParser.java | 56 +++++++++++++++ .../process/TestTransformProcessor.java | 26 +++++++ 14 files changed, 387 insertions(+), 15 deletions(-) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java index 23ca6644fa1..c979ef71c4a 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java @@ -52,6 +52,8 @@ import net.sf.jsqlparser.statement.select.SelectExpressionItem; import net.sf.jsqlparser.statement.select.SelectItem; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.StringReader; import java.nio.charset.Charset; @@ -67,6 +69,8 @@ */ public class TransformProcessor { + private static final Logger LOG = LoggerFactory.getLogger(TransformProcessor.class); + private TransformConfig config; private SourceDecoder decoder; private SinkEncoder encoder; @@ -166,8 +170,13 @@ public List transform(byte[] srcBytes, Map extParams) { SinkData sinkData = new DefaultSinkData(); for (Entry entry : this.selectItemMap.entrySet()) { String fieldName = entry.getKey(); - Object fieldValue = entry.getValue().parse(sourceData, i); - sinkData.putField(fieldName, String.valueOf(fieldValue)); + try { + Object fieldValue = entry.getValue().parse(sourceData, i); + sinkData.putField(fieldName, String.valueOf(fieldValue)); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + sinkData.putField(fieldName, ""); + } } sinkDatas.add(this.encoder.encode(sinkData)); } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java index 31726260001..6910e0c9ca9 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/EqualsToOperator.java @@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.EqualsTo; -import org.apache.commons.lang.ObjectUtils; /** * EqualsToOperator @@ -43,9 +42,11 @@ public EqualsToOperator(EqualsTo expr) { * @param rowIndex * @return */ + @SuppressWarnings("rawtypes") @Override public boolean check(SourceData sourceData, int rowIndex) { - return ObjectUtils.equals(this.left.parse(sourceData, rowIndex), this.right.parse(sourceData, rowIndex)); + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), + (Comparable) this.right.parse(sourceData, rowIndex)) == 0; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java index 07da9d79c2b..eb7689932e1 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanEqualsOperator.java @@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals; -import org.apache.commons.lang.ObjectUtils; /** * GreaterThanEqualsOperator @@ -46,7 +45,7 @@ public GreaterThanEqualsOperator(GreaterThanEquals expr) { @SuppressWarnings("rawtypes") @Override public boolean check(SourceData sourceData, int rowIndex) { - return ObjectUtils.compare((Comparable) this.left.parse(sourceData, rowIndex), + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), (Comparable) this.right.parse(sourceData, rowIndex)) >= 0; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java index 3b2158d96b0..e0db44b1e30 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/GreaterThanOperator.java @@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.GreaterThan; -import org.apache.commons.lang.ObjectUtils; /** * GreaterThanOperator @@ -46,7 +45,7 @@ public GreaterThanOperator(GreaterThan expr) { @SuppressWarnings("rawtypes") @Override public boolean check(SourceData sourceData, int rowIndex) { - return ObjectUtils.compare((Comparable) this.left.parse(sourceData, rowIndex), + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), (Comparable) this.right.parse(sourceData, rowIndex)) > 0; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java index fec4ed80191..8b3628ddb7d 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanEqualsOperator.java @@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals; -import org.apache.commons.lang.ObjectUtils; /** * MinorThanEqualsOperator @@ -46,7 +45,7 @@ public MinorThanEqualsOperator(MinorThanEquals expr) { @SuppressWarnings("rawtypes") @Override public boolean check(SourceData sourceData, int rowIndex) { - return ObjectUtils.compare((Comparable) this.left.parse(sourceData, rowIndex), + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), (Comparable) this.right.parse(sourceData, rowIndex)) <= 0; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java index 5d9db7dd9c5..17baa9cb173 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/MinorThanOperator.java @@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.MinorThan; -import org.apache.commons.lang.ObjectUtils; /** * MinorThanOperator @@ -46,7 +45,7 @@ public MinorThanOperator(MinorThan expr) { @SuppressWarnings("rawtypes") @Override public boolean check(SourceData sourceData, int rowIndex) { - return ObjectUtils.compare((Comparable) this.left.parse(sourceData, rowIndex), + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), (Comparable) this.right.parse(sourceData, rowIndex)) < 0; } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java index 9c58e704763..dbe185dec50 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/NotEqualsToOperator.java @@ -21,7 +21,6 @@ import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; -import org.apache.commons.lang.ObjectUtils; /** * NotEqualsToOperator @@ -43,9 +42,11 @@ public NotEqualsToOperator(NotEqualsTo expr) { * @param rowIndex * @return */ + @SuppressWarnings("rawtypes") @Override public boolean check(SourceData sourceData, int rowIndex) { - return !ObjectUtils.equals(this.left.parse(sourceData, rowIndex), this.right.parse(sourceData, rowIndex)); + return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex), + (Comparable) this.right.parse(sourceData, rowIndex)) != 0; } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 8afe2f0c74d..361ce1aec3c 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -17,9 +17,14 @@ package org.apache.inlong.sdk.transform.process.operator; +import org.apache.inlong.sdk.transform.process.parser.AdditionParser; import org.apache.inlong.sdk.transform.process.parser.ColumnParser; +import org.apache.inlong.sdk.transform.process.parser.DivisionParser; import org.apache.inlong.sdk.transform.process.parser.LongParser; +import org.apache.inlong.sdk.transform.process.parser.MultiplicationParser; +import org.apache.inlong.sdk.transform.process.parser.ParenthesisParser; import org.apache.inlong.sdk.transform.process.parser.StringParser; +import org.apache.inlong.sdk.transform.process.parser.SubtractionParser; import org.apache.inlong.sdk.transform.process.parser.ValueParser; import net.sf.jsqlparser.expression.Expression; @@ -28,6 +33,10 @@ import net.sf.jsqlparser.expression.NotExpression; import net.sf.jsqlparser.expression.Parenthesis; import net.sf.jsqlparser.expression.StringValue; +import net.sf.jsqlparser.expression.operators.arithmetic.Addition; +import net.sf.jsqlparser.expression.operators.arithmetic.Division; +import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication; +import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction; import net.sf.jsqlparser.expression.operators.conditional.AndExpression; import net.sf.jsqlparser.expression.operators.conditional.OrExpression; import net.sf.jsqlparser.expression.operators.relational.EqualsTo; @@ -37,6 +46,9 @@ import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals; import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo; import net.sf.jsqlparser.schema.Column; +import org.apache.commons.lang.ObjectUtils; + +import java.math.BigDecimal; /** * OperatorTools @@ -44,6 +56,10 @@ */ public class OperatorTools { + public static final String ROOT_KEY = "$root"; + + public static final String CHILD_KEY = "$child"; + public static ExpressionOperator buildOperator(Expression expr) { if (expr instanceof AndExpression) { return new AndOperator((AndExpression) expr); @@ -76,9 +92,61 @@ public static ValueParser buildParser(Expression expr) { return new StringParser((StringValue) expr); } else if (expr instanceof LongValue) { return new LongParser((LongValue) expr); + } else if (expr instanceof Parenthesis) { + return new ParenthesisParser((Parenthesis) expr); + } else if (expr instanceof Addition) { + return new AdditionParser((Addition) expr); + } else if (expr instanceof Subtraction) { + return new SubtractionParser((Subtraction) expr); + } else if (expr instanceof Multiplication) { + return new MultiplicationParser((Multiplication) expr); + } else if (expr instanceof Division) { + return new DivisionParser((Division) expr); } else if (expr instanceof Function) { - return new ColumnParser((Function) expr); + String exprString = expr.toString(); + if (exprString.startsWith(ROOT_KEY) || exprString.startsWith(CHILD_KEY)) { + return new ColumnParser((Function) expr); + } else { + // TODO + } } return null; } + + /** + * parseBigDecimal + * @param value + * @return + */ + public static BigDecimal parseBigDecimal(Object value) { + if (value instanceof BigDecimal) { + return (BigDecimal) value; + } else { + return new BigDecimal(String.valueOf(value)); + } + } + + /** + * compareValue + * @param value + * @return + */ + @SuppressWarnings("rawtypes") + public static int compareValue(Comparable left, Comparable right) { + if (left instanceof String) { + if (right instanceof String) { + return ObjectUtils.compare(left, right); + } else { + BigDecimal leftValue = parseBigDecimal(left); + return ObjectUtils.compare(leftValue, right); + } + } else { + if (right instanceof String) { + BigDecimal rightValue = parseBigDecimal(right); + return ObjectUtils.compare(left, rightValue); + } else { + return ObjectUtils.compare(left, right); + } + } + } } diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java new file mode 100644 index 00000000000..a0f03ab4cd0 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/AdditionParser.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import net.sf.jsqlparser.expression.operators.arithmetic.Addition; + +import java.math.BigDecimal; + +/** + * AdditionParser + * + */ +public class AdditionParser implements ValueParser { + + private ValueParser left; + + private ValueParser right; + + public AdditionParser(Addition expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex) { + Object leftObj = this.left.parse(sourceData, rowIndex); + Object rightObj = this.right.parse(sourceData, rowIndex); + BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); + BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); + return leftValue.add(rightValue); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java new file mode 100644 index 00000000000..5dc94b6e999 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/DivisionParser.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import net.sf.jsqlparser.expression.operators.arithmetic.Division; + +import java.math.BigDecimal; + +/** + * DivisionParser + * + */ +public class DivisionParser implements ValueParser { + + private ValueParser left; + + private ValueParser right; + + public DivisionParser(Division expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex) { + Object leftObj = this.left.parse(sourceData, rowIndex); + Object rightObj = this.right.parse(sourceData, rowIndex); + BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); + BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); + return leftValue.divide(rightValue); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java new file mode 100644 index 00000000000..7918b434ac7 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/MultiplicationParser.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import net.sf.jsqlparser.expression.operators.arithmetic.Multiplication; + +import java.math.BigDecimal; + +/** + * MultiplicationParser + * + */ +public class MultiplicationParser implements ValueParser { + + private ValueParser left; + + private ValueParser right; + + public MultiplicationParser(Multiplication expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex) { + Object leftObj = this.left.parse(sourceData, rowIndex); + Object rightObj = this.right.parse(sourceData, rowIndex); + BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); + BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); + return leftValue.multiply(rightValue); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java new file mode 100644 index 00000000000..61a2bd1bf31 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/ParenthesisParser.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import net.sf.jsqlparser.expression.Parenthesis; + +/** + * ParenthesisParser + * + */ +public class ParenthesisParser implements ValueParser { + + private ValueParser node; + + public ParenthesisParser(Parenthesis expr) { + this.node = OperatorTools.buildParser(expr.getExpression()); + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex) { + return node.parse(sourceData, rowIndex); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java new file mode 100644 index 00000000000..af36c79452e --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/parser/SubtractionParser.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.parser; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; + +import net.sf.jsqlparser.expression.operators.arithmetic.Subtraction; + +import java.math.BigDecimal; + +/** + * SubtractionParser + * + */ +public class SubtractionParser implements ValueParser { + + private ValueParser left; + + private ValueParser right; + + public SubtractionParser(Subtraction expr) { + this.left = OperatorTools.buildParser(expr.getLeftExpression()); + this.right = OperatorTools.buildParser(expr.getRightExpression()); + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex) { + Object leftObj = this.left.parse(sourceData, rowIndex); + Object rightObj = this.right.parse(sourceData, rowIndex); + BigDecimal leftValue = OperatorTools.parseBigDecimal(leftObj); + BigDecimal rightValue = OperatorTools.parseBigDecimal(rightObj); + return leftValue.subtract(rightValue); + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java index 282e45edfb2..b508f8f2aa4 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformProcessor.java @@ -269,4 +269,30 @@ public void testPb2CsvForOne() { e.printStackTrace(); } } + + @Test + public void testPb2CsvForAdd() { + try { + List fields = this.getTestFieldList(); + String transformBase64 = this.getPbTestDescription(); + SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", null); + SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields); + String transformSql = "select $root.sid," + + "($root.msgs(1).msgTime-$root.msgs(0).msgTime)/$root.packageID field2," + + "$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/$root.packageID)" + + "*$root.packageID field3," + + "$root.msgs(0).msg field4 from source " + + "where $root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime" + + "+$root.msgs(0).msgTime+$root.msgs(1).msgTime)"; + TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql); + // case1 + TransformProcessor processor = new TransformProcessor(config); + byte[] srcBytes = this.getPbTestData(); + List output = processor.transform(srcBytes, new HashMap<>()); + Assert.assertTrue(output.size() == 1); + Assert.assertEquals(output.get(0), "sid|2|3426487836002|msgValue4"); + } catch (Exception e) { + e.printStackTrace(); + } + } }