diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java index 0c3a9283a8152f3..98900edce5a5461 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java @@ -68,6 +68,10 @@ public MaxComputeColumnValue() { idx = 0; } + public void setColumnIdx(int idx) { + this.idx = idx; + } + public MaxComputeColumnValue(ValueVector valueVector, int i) { this.column = valueVector; this.idx = i; @@ -89,79 +93,58 @@ public boolean isNull() { return column.isNull(idx); } - private void skippedIfNull() { - // null has been process by appendValue with isNull() - try { - if (column.isNull(idx)) { - idx++; - } - } catch (IndexOutOfBoundsException e) { - // skip left rows - idx++; - } - } - @Override public boolean getBoolean() { - skippedIfNull(); BitVector bitCol = (BitVector) column; - return bitCol.get(idx++) != 0; + return bitCol.get(idx) != 0; } @Override public byte getByte() { - skippedIfNull(); TinyIntVector tinyIntCol = (TinyIntVector) column; - return tinyIntCol.get(idx++); + return tinyIntCol.get(idx); } @Override public short getShort() { - skippedIfNull(); SmallIntVector smallIntCol = (SmallIntVector) column; - return smallIntCol.get(idx++); + return smallIntCol.get(idx); } @Override public int getInt() { - skippedIfNull(); IntVector intCol = (IntVector) column; - return intCol.get(idx++); + return intCol.get(idx); } @Override public float getFloat() { - skippedIfNull(); Float4Vector floatCol = (Float4Vector) column; - return floatCol.get(idx++); + return floatCol.get(idx); } @Override public long getLong() { - skippedIfNull(); BigIntVector longCol = (BigIntVector) column; - return longCol.get(idx++); + return longCol.get(idx); } @Override public double getDouble() { - skippedIfNull(); Float8Vector doubleCol = (Float8Vector) column; - return doubleCol.get(idx++); + return doubleCol.get(idx); } @Override public BigInteger getBigInteger() { - skippedIfNull(); BigIntVector longCol = (BigIntVector) column; - return BigInteger.valueOf(longCol.get(idx++)); + return BigInteger.valueOf(longCol.get(idx)); } @Override public BigDecimal getDecimal() { - skippedIfNull(); DecimalVector decimalCol = (DecimalVector) column; - return getBigDecimalFromArrowBuf(column.getDataBuffer(), idx++, + return getBigDecimalFromArrowBuf(column.getDataBuffer(), idx, decimalCol.getScale(), DecimalVector.TYPE_WIDTH); } @@ -195,26 +178,23 @@ public static BigDecimal getBigDecimalFromArrowBuf(ArrowBuf byteBuf, int index, @Override public String getString() { - skippedIfNull(); VarCharVector varcharCol = (VarCharVector) column; - String v = varcharCol.getObject(idx++).toString(); + String v = varcharCol.getObject(idx).toString(); return v == null ? new String(new byte[0]) : v; } public String getChar() { - skippedIfNull(); VarCharVector varcharCol = (VarCharVector) column; - return varcharCol.getObject(idx++).toString().stripTrailing(); + return varcharCol.getObject(idx).toString().stripTrailing(); } // Maybe I can use `appendBytesAndOffset(byte[] src, int offset, int length)` to reduce the creation of byte[]. // But I haven't figured out how to write it elegantly. public byte[] getCharAsBytes() { - skippedIfNull(); VarCharVector varcharCol = (VarCharVector) column; - byte[] v = varcharCol.getObject(idx++).getBytes(); + byte[] v = varcharCol.getObject(idx).getBytes(); if (v == null) { return new byte[0]; @@ -230,31 +210,28 @@ public byte[] getCharAsBytes() { @Override public byte[] getStringAsBytes() { - skippedIfNull(); VarCharVector varcharCol = (VarCharVector) column; - byte[] v = varcharCol.getObject(idx++).getBytes(); + byte[] v = varcharCol.getObject(idx).getBytes(); return v == null ? new byte[0] : v; } @Override public LocalDate getDate() { - skippedIfNull(); DateDayVector dateCol = (DateDayVector) column; - Integer intVal = dateCol.getObject(idx++); + Integer intVal = dateCol.getObject(idx); return LocalDate.ofEpochDay(intVal == null ? 0 : intVal); } @Override public LocalDateTime getDateTime() { - skippedIfNull(); LocalDateTime result; ArrowType.Timestamp timestampType = ( ArrowType.Timestamp) column.getField().getFieldType().getType(); if (timestampType.getUnit() == org.apache.arrow.vector.types.TimeUnit.MILLISECOND) { - result = convertToLocalDateTime((TimeStampMilliTZVector) column, idx++); + result = convertToLocalDateTime((TimeStampMilliTZVector) column, idx); } else { NullableTimeStampNanoHolder valueHoder = new NullableTimeStampNanoHolder(); - ((TimeStampNanoVector) column).get(idx++, valueHoder); + ((TimeStampNanoVector) column).get(idx, valueHoder); long timestampNanos = valueHoder.value; result = LocalDateTime.ofEpochSecond(timestampNanos / 1_000_000_000, @@ -264,10 +241,10 @@ public LocalDateTime getDateTime() { /* timestampType.getUnit() result = switch (timestampType.getUnit()) { - case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx++); - case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx++); - case MILLISECOND -> convertToLocalDateTime((TimeStampMilliTZVector) column, idx++); - case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx++); + case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx); + case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx); + case MILLISECOND -> convertToLocalDateTime((TimeStampMilliTZVector) column, idx); + case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx); }; Because : @@ -287,9 +264,9 @@ public LocalDateTime getDateTime() { TIMESTAMP_NTZ is NTZ => column is TimeStampNanoVector So: - case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx++); - case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx++); - case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx++); + case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx); + case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx); + case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx); may never be used. */ @@ -298,15 +275,13 @@ public LocalDateTime getDateTime() { @Override public byte[] getBytes() { - skippedIfNull(); VarBinaryVector binaryCol = (VarBinaryVector) column; - byte[] v = binaryCol.getObject(idx++); + byte[] v = binaryCol.getObject(idx); return v == null ? new byte[0] : v; } @Override public void unpackArray(List values) { - skippedIfNull(); ListVector listCol = (ListVector) column; int elemSize = listCol.getObject(idx).size(); for (int i = 0; i < elemSize; i++) { @@ -314,12 +289,10 @@ public void unpackArray(List values) { values.add(val); offset++; } - idx++; } @Override public void unpackMap(List keys, List values) { - skippedIfNull(); MapVector mapCol = (MapVector) column; int elemSize = mapCol.getElementEndIndex(idx) - mapCol.getElementStartIndex(idx); List innerCols = ((StructVector) mapCol.getDataVector()).getChildrenFromFields(); @@ -332,19 +305,16 @@ public void unpackMap(List keys, List values) { values.add(val); offset++; } - idx++; } @Override public void unpackStruct(List structFieldIndex, List values) { - skippedIfNull(); StructVector structCol = (StructVector) column; List innerCols = structCol.getChildrenFromFields(); for (Integer fieldIndex : structFieldIndex) { MaxComputeColumnValue val = new MaxComputeColumnValue(innerCols.get(fieldIndex), idx); values.add(val); } - idx++; } public static LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector, int index) { diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index df8066a9fa3241d..c72153c3449aac6 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -226,6 +226,7 @@ private int readVectors(int expectedRows) throws IOException { } columnValue.reset(column); for (int j = 0; j < batchRows; j++) { + columnValue.setColumnIdx(j); appendData(readColumnId, columnValue); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java index fcbc0a5e8fc0a34..cfcf4331b96a976 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java @@ -29,17 +29,12 @@ import com.aliyun.odps.Odps; import com.aliyun.odps.OdpsException; import com.aliyun.odps.Partition; -import com.aliyun.odps.Project; import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; -import com.aliyun.odps.security.SecurityManager; import com.aliyun.odps.table.configuration.SplitOptions; import com.aliyun.odps.table.enviroment.Credentials; import com.aliyun.odps.table.enviroment.EnvironmentSettings; -import com.aliyun.odps.utils.StringUtils; import com.google.common.collect.ImmutableList; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import java.util.ArrayList; import java.util.Iterator; @@ -52,7 +47,6 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { private String accessKey; private String secretKey; private String endpoint; - private String catalogOwner; private String defaultProject; private String quota; private EnvironmentSettings settings; @@ -128,25 +122,28 @@ public Odps getClient() { protected List listDatabaseNames() { List result = new ArrayList<>(); - try { - result.add(defaultProject); - if (StringUtils.isNullOrEmpty(catalogOwner)) { - SecurityManager sm = odps.projects().get().getSecurityManager(); - String whoami = sm.runQuery("whoami", false); - - JsonObject js = JsonParser.parseString(whoami).getAsJsonObject(); - catalogOwner = js.get("DisplayName").getAsString(); - } - Iterator iterator = odps.projects().iterator(catalogOwner); - while (iterator.hasNext()) { - Project project = iterator.next(); - if (!project.getName().equals(defaultProject)) { - result.add(project.getName()); - } - } - } catch (OdpsException e) { - throw new RuntimeException(e); - } + result.add(defaultProject); + + // TODO: Improve `show tables` and `select * from table` when `use other project`. + // try { + // result.add(defaultProject); + // if (StringUtils.isNullOrEmpty(catalogOwner)) { + // SecurityManager sm = odps.projects().get().getSecurityManager(); + // String whoami = sm.runQuery("whoami", false); + // + // JsonObject js = JsonParser.parseString(whoami).getAsJsonObject(); + // catalogOwner = js.get("DisplayName").getAsString(); + // } + // Iterator iterator = odps.projects().iterator(catalogOwner); + // while (iterator.hasNext()) { + // Project project = iterator.next(); + // if (!project.getName().equals(defaultProject)) { + // result.add(project.getName()); + // } + // } + // } catch (OdpsException e) { + // throw new RuntimeException(e); + // } return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java index aee2b69a6561601..d1e715ed354905a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java @@ -109,7 +109,6 @@ private void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeS void createTableBatchReadSession() throws UserException { Predicate filterPredicate = convertPredicate(); - List requiredPartitionColumns = new ArrayList<>(); List orderedRequiredDataColumns = new ArrayList<>(); @@ -162,31 +161,30 @@ protected Predicate convertPredicate() { return Predicate.NO_PREDICATE; } - if (conjuncts.size() == 1) { + List odpsPredicates = new ArrayList<>(); + for (Expr dorisPredicate : conjuncts) { try { - return convertExprToOdpsPredicate(conjuncts.get(0)); + odpsPredicates.add(convertExprToOdpsPredicate(dorisPredicate)); } catch (AnalysisException e) { - Log.info("Failed to convert predicate " + conjuncts.get(0) + " to odps predicate"); + Log.info("Failed to convert predicate " + dorisPredicate); Log.info("Reason: " + e.getMessage()); - return Predicate.NO_PREDICATE; } } - com.aliyun.odps.table.optimizer.predicate.CompoundPredicate - filterPredicate = new com.aliyun.odps.table.optimizer.predicate.CompoundPredicate( - com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND - ); - - for (Expr predicate : conjuncts) { - try { - filterPredicate.addPredicate(convertExprToOdpsPredicate(predicate)); - } catch (AnalysisException e) { - Log.info("Failed to convert predicate " + predicate); - Log.info("Reason: " + e.getMessage()); - return Predicate.NO_PREDICATE; + if (odpsPredicates.isEmpty()) { + return Predicate.NO_PREDICATE; + } else if (odpsPredicates.size() == 1) { + return odpsPredicates.get(0); + } else { + com.aliyun.odps.table.optimizer.predicate.CompoundPredicate + filterPredicate = new com.aliyun.odps.table.optimizer.predicate.CompoundPredicate( + com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND); + + for (Predicate odpsPredicate : odpsPredicates) { + filterPredicate.addPredicate(odpsPredicate); } + return filterPredicate; } - return filterPredicate; } private Predicate convertExprToOdpsPredicate(Expr expr) throws AnalysisException { @@ -222,7 +220,7 @@ private Predicate convertExprToOdpsPredicate(Expr expr) throws AnalysisException InPredicate inPredicate = (InPredicate) expr; if (inPredicate.getChildren().size() > 2) { - return Predicate.NO_PREDICATE; + throw new AnalysisException("InPredicate must contain at most 1 children"); } com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator odpsOp = inPredicate.isNotIn() @@ -332,7 +330,6 @@ private String convertSlotRefToColumnName(Expr expr) throws AnalysisException { throw new AnalysisException("Do not support convert [" + expr.getExprName() + "] in convertSlotRefToAttribute."); - } private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr) throws AnalysisException { diff --git a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out index e1479672f232e7f..8e2dbfd52b2a51c 100644 --- a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out +++ b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out @@ -139,3 +139,21 @@ yy=2023/mm=08/dd=05/pt=5 5 2023 08 05 5 2023 08 05 +-- !null_1 -- +1 1 +2 \N +3 \N +4 4 +5 \N +6 6 + +-- !null_2 -- +1 1 +4 4 +6 6 + +-- !null_3 -- +2 \N +3 \N +5 \N + diff --git a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy index 7bcd4b5ad4198a6..6663a2aa842df38 100644 --- a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy @@ -278,7 +278,13 @@ CAST(-7.0 AS DOUBLE), CAST(8.00 AS DECIMAL(5,2)) ); - + drop table mc_test_null; + CREATE TABLE `mc_test_null` ( + `id` int, + `col` int + ); + insert into mc_test_null values (1,1),(2,NULL),(3,NULL),(4,4),(5,NULL),(6,6); + */ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remote,external_remote_maxcompute") { String enabled = context.config.otherConfigs.get("enableMaxComputeTest") @@ -353,5 +359,12 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot order_qt_multi_partition_q8 """ select count(*) from multi_partitions where pt>=3; """ order_qt_multi_partition_q9 """ select city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd from multi_partitions where pt >= 2 and pt < 4 and finished_time is not null; """ order_qt_multi_partition_q10 """ select pt, yy, mm, dd from multi_partitions where pt >= 2 and create_time > '2023-08-03 03:11:00' order by pt, yy, mm, dd; """ + + //test null value + order_qt_null_1 """ select * from mc_test_null; """ + order_qt_null_2 """ select * from mc_test_null where col is not null ; """ + order_qt_null_3 """ select * from mc_test_null where col is null ; """ + + } }