Skip to content

Commit

Permalink
[fix](maxCompute)Fixed the bug that extra nulls are read when reading…
Browse files Browse the repository at this point in the history
… maxcompute. (apache#40888)

before pr apache#40225 
## Proposed changes

Fixed a bug where when reading maxcompute, if there are null values ​​in
a batch, null values ​​will always be read out.
  • Loading branch information
hubgeter committed Oct 9, 2024
1 parent 5611a69 commit c68e55f
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public MaxComputeColumnValue() {
idx = 0;
}

public void setColumnIdx(int idx) {
this.idx = idx;
}

public MaxComputeColumnValue(ValueVector valueVector, int i) {
this.column = valueVector;
this.idx = i;
Expand All @@ -89,79 +93,58 @@ public boolean isNull() {
return column.isNull(idx);
}

private void skippedIfNull() {
// null has been process by appendValue with isNull()
try {
if (column.isNull(idx)) {
idx++;
}
} catch (IndexOutOfBoundsException e) {
// skip left rows
idx++;
}
}

@Override
public boolean getBoolean() {
skippedIfNull();
BitVector bitCol = (BitVector) column;
return bitCol.get(idx++) != 0;
return bitCol.get(idx) != 0;
}

@Override
public byte getByte() {
skippedIfNull();
TinyIntVector tinyIntCol = (TinyIntVector) column;
return tinyIntCol.get(idx++);
return tinyIntCol.get(idx);
}

@Override
public short getShort() {
skippedIfNull();
SmallIntVector smallIntCol = (SmallIntVector) column;
return smallIntCol.get(idx++);
return smallIntCol.get(idx);
}

@Override
public int getInt() {
skippedIfNull();
IntVector intCol = (IntVector) column;
return intCol.get(idx++);
return intCol.get(idx);
}

@Override
public float getFloat() {
skippedIfNull();
Float4Vector floatCol = (Float4Vector) column;
return floatCol.get(idx++);
return floatCol.get(idx);
}

@Override
public long getLong() {
skippedIfNull();
BigIntVector longCol = (BigIntVector) column;
return longCol.get(idx++);
return longCol.get(idx);
}

@Override
public double getDouble() {
skippedIfNull();
Float8Vector doubleCol = (Float8Vector) column;
return doubleCol.get(idx++);
return doubleCol.get(idx);
}

@Override
public BigInteger getBigInteger() {
skippedIfNull();
BigIntVector longCol = (BigIntVector) column;
return BigInteger.valueOf(longCol.get(idx++));
return BigInteger.valueOf(longCol.get(idx));
}

@Override
public BigDecimal getDecimal() {
skippedIfNull();
DecimalVector decimalCol = (DecimalVector) column;
return getBigDecimalFromArrowBuf(column.getDataBuffer(), idx++,
return getBigDecimalFromArrowBuf(column.getDataBuffer(), idx,
decimalCol.getScale(), DecimalVector.TYPE_WIDTH);
}

Expand Down Expand Up @@ -195,26 +178,23 @@ public static BigDecimal getBigDecimalFromArrowBuf(ArrowBuf byteBuf, int index,

@Override
public String getString() {
skippedIfNull();
VarCharVector varcharCol = (VarCharVector) column;
String v = varcharCol.getObject(idx++).toString();
String v = varcharCol.getObject(idx).toString();
return v == null ? new String(new byte[0]) : v;
}



public String getChar() {
skippedIfNull();
VarCharVector varcharCol = (VarCharVector) column;
return varcharCol.getObject(idx++).toString().stripTrailing();
return varcharCol.getObject(idx).toString().stripTrailing();
}

// Maybe I can use `appendBytesAndOffset(byte[] src, int offset, int length)` to reduce the creation of byte[].
// But I haven't figured out how to write it elegantly.
public byte[] getCharAsBytes() {
skippedIfNull();
VarCharVector varcharCol = (VarCharVector) column;
byte[] v = varcharCol.getObject(idx++).getBytes();
byte[] v = varcharCol.getObject(idx).getBytes();

if (v == null) {
return new byte[0];
Expand All @@ -230,31 +210,28 @@ public byte[] getCharAsBytes() {

@Override
public byte[] getStringAsBytes() {
skippedIfNull();
VarCharVector varcharCol = (VarCharVector) column;
byte[] v = varcharCol.getObject(idx++).getBytes();
byte[] v = varcharCol.getObject(idx).getBytes();
return v == null ? new byte[0] : v;
}

@Override
public LocalDate getDate() {
skippedIfNull();
DateDayVector dateCol = (DateDayVector) column;
Integer intVal = dateCol.getObject(idx++);
Integer intVal = dateCol.getObject(idx);
return LocalDate.ofEpochDay(intVal == null ? 0 : intVal);
}

@Override
public LocalDateTime getDateTime() {
skippedIfNull();
LocalDateTime result;

ArrowType.Timestamp timestampType = ( ArrowType.Timestamp) column.getField().getFieldType().getType();
if (timestampType.getUnit() == org.apache.arrow.vector.types.TimeUnit.MILLISECOND) {
result = convertToLocalDateTime((TimeStampMilliTZVector) column, idx++);
result = convertToLocalDateTime((TimeStampMilliTZVector) column, idx);
} else {
NullableTimeStampNanoHolder valueHoder = new NullableTimeStampNanoHolder();
((TimeStampNanoVector) column).get(idx++, valueHoder);
((TimeStampNanoVector) column).get(idx, valueHoder);
long timestampNanos = valueHoder.value;

result = LocalDateTime.ofEpochSecond(timestampNanos / 1_000_000_000,
Expand All @@ -264,10 +241,10 @@ public LocalDateTime getDateTime() {
/*
timestampType.getUnit()
result = switch (timestampType.getUnit()) {
case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx++);
case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx++);
case MILLISECOND -> convertToLocalDateTime((TimeStampMilliTZVector) column, idx++);
case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx++);
case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx);
case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx);
case MILLISECOND -> convertToLocalDateTime((TimeStampMilliTZVector) column, idx);
case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx);
};
Because :
Expand All @@ -287,9 +264,9 @@ public LocalDateTime getDateTime() {
TIMESTAMP_NTZ is NTZ => column is TimeStampNanoVector
So:
case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx++);
case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx++);
case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx++);
case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx);
case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx);
case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx);
may never be used.
*/

Expand All @@ -298,28 +275,24 @@ public LocalDateTime getDateTime() {

@Override
public byte[] getBytes() {
skippedIfNull();
VarBinaryVector binaryCol = (VarBinaryVector) column;
byte[] v = binaryCol.getObject(idx++);
byte[] v = binaryCol.getObject(idx);
return v == null ? new byte[0] : v;
}

@Override
public void unpackArray(List<ColumnValue> values) {
skippedIfNull();
ListVector listCol = (ListVector) column;
int elemSize = listCol.getObject(idx).size();
for (int i = 0; i < elemSize; i++) {
MaxComputeColumnValue val = new MaxComputeColumnValue(listCol.getDataVector(), offset);
values.add(val);
offset++;
}
idx++;
}

@Override
public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
skippedIfNull();
MapVector mapCol = (MapVector) column;
int elemSize = mapCol.getElementEndIndex(idx) - mapCol.getElementStartIndex(idx);
List<FieldVector> innerCols = ((StructVector) mapCol.getDataVector()).getChildrenFromFields();
Expand All @@ -332,19 +305,16 @@ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
values.add(val);
offset++;
}
idx++;
}

@Override
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
skippedIfNull();
StructVector structCol = (StructVector) column;
List<FieldVector> innerCols = structCol.getChildrenFromFields();
for (Integer fieldIndex : structFieldIndex) {
MaxComputeColumnValue val = new MaxComputeColumnValue(innerCols.get(fieldIndex), idx);
values.add(val);
}
idx++;
}

public static LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector, int index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ private int readVectors(int expectedRows) throws IOException {
}
columnValue.reset(column);
for (int j = 0; j < batchRows; j++) {
columnValue.setColumnIdx(j);
appendData(readColumnId, columnValue);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,12 @@
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.Partition;
import com.aliyun.odps.Project;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.security.SecurityManager;
import com.aliyun.odps.table.configuration.SplitOptions;
import com.aliyun.odps.table.enviroment.Credentials;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
import com.aliyun.odps.utils.StringUtils;
import com.google.common.collect.ImmutableList;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -52,7 +47,6 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
private String accessKey;
private String secretKey;
private String endpoint;
private String catalogOwner;
private String defaultProject;
private String quota;
private EnvironmentSettings settings;
Expand Down Expand Up @@ -128,25 +122,28 @@ public Odps getClient() {

protected List<String> listDatabaseNames() {
List<String> result = new ArrayList<>();
try {
result.add(defaultProject);
if (StringUtils.isNullOrEmpty(catalogOwner)) {
SecurityManager sm = odps.projects().get().getSecurityManager();
String whoami = sm.runQuery("whoami", false);

JsonObject js = JsonParser.parseString(whoami).getAsJsonObject();
catalogOwner = js.get("DisplayName").getAsString();
}
Iterator<Project> iterator = odps.projects().iterator(catalogOwner);
while (iterator.hasNext()) {
Project project = iterator.next();
if (!project.getName().equals(defaultProject)) {
result.add(project.getName());
}
}
} catch (OdpsException e) {
throw new RuntimeException(e);
}
result.add(defaultProject);

// TODO: Improve `show tables` and `select * from table` when `use other project`.
// try {
// result.add(defaultProject);
// if (StringUtils.isNullOrEmpty(catalogOwner)) {
// SecurityManager sm = odps.projects().get().getSecurityManager();
// String whoami = sm.runQuery("whoami", false);
//
// JsonObject js = JsonParser.parseString(whoami).getAsJsonObject();
// catalogOwner = js.get("DisplayName").getAsString();
// }
// Iterator<Project> iterator = odps.projects().iterator(catalogOwner);
// while (iterator.hasNext()) {
// Project project = iterator.next();
// if (!project.getName().equals(defaultProject)) {
// result.add(project.getName());
// }
// }
// } catch (OdpsException e) {
// throw new RuntimeException(e);
// }
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ private void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeS
void createTableBatchReadSession() throws UserException {
Predicate filterPredicate = convertPredicate();


List<String> requiredPartitionColumns = new ArrayList<>();
List<String> orderedRequiredDataColumns = new ArrayList<>();

Expand Down Expand Up @@ -162,31 +161,30 @@ protected Predicate convertPredicate() {
return Predicate.NO_PREDICATE;
}

if (conjuncts.size() == 1) {
List<Predicate> odpsPredicates = new ArrayList<>();
for (Expr dorisPredicate : conjuncts) {
try {
return convertExprToOdpsPredicate(conjuncts.get(0));
odpsPredicates.add(convertExprToOdpsPredicate(dorisPredicate));
} catch (AnalysisException e) {
Log.info("Failed to convert predicate " + conjuncts.get(0) + " to odps predicate");
Log.info("Failed to convert predicate " + dorisPredicate);
Log.info("Reason: " + e.getMessage());
return Predicate.NO_PREDICATE;
}
}

com.aliyun.odps.table.optimizer.predicate.CompoundPredicate
filterPredicate = new com.aliyun.odps.table.optimizer.predicate.CompoundPredicate(
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND
);

for (Expr predicate : conjuncts) {
try {
filterPredicate.addPredicate(convertExprToOdpsPredicate(predicate));
} catch (AnalysisException e) {
Log.info("Failed to convert predicate " + predicate);
Log.info("Reason: " + e.getMessage());
return Predicate.NO_PREDICATE;
if (odpsPredicates.isEmpty()) {
return Predicate.NO_PREDICATE;
} else if (odpsPredicates.size() == 1) {
return odpsPredicates.get(0);
} else {
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate
filterPredicate = new com.aliyun.odps.table.optimizer.predicate.CompoundPredicate(
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND);

for (Predicate odpsPredicate : odpsPredicates) {
filterPredicate.addPredicate(odpsPredicate);
}
return filterPredicate;
}
return filterPredicate;
}

private Predicate convertExprToOdpsPredicate(Expr expr) throws AnalysisException {
Expand Down Expand Up @@ -222,7 +220,7 @@ private Predicate convertExprToOdpsPredicate(Expr expr) throws AnalysisException

InPredicate inPredicate = (InPredicate) expr;
if (inPredicate.getChildren().size() > 2) {
return Predicate.NO_PREDICATE;
throw new AnalysisException("InPredicate must contain at most 1 children");
}
com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator odpsOp =
inPredicate.isNotIn()
Expand Down Expand Up @@ -332,7 +330,6 @@ private String convertSlotRefToColumnName(Expr expr) throws AnalysisException {
throw new AnalysisException("Do not support convert ["
+ expr.getExprName() + "] in convertSlotRefToAttribute.");


}

private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr) throws AnalysisException {
Expand Down
Loading

0 comments on commit c68e55f

Please sign in to comment.