Skip to content

Commit

Permalink
[Enhancement](maxCompute)Implement compatibility with existing maxcom…
Browse files Browse the repository at this point in the history
…pute catalogs from previous versions. (apache#41386)

before pr apache#40225
  • Loading branch information
hubgeter authored and eldenmoon committed Oct 10, 2024
1 parent a3b3da6 commit 3982657
Show file tree
Hide file tree
Showing 13 changed files with 342 additions and 88 deletions.
1 change: 1 addition & 0 deletions be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
return Status::InternalError("Failed to get/create JVM");
}
SCOPED_TIMER(_open_scanner_time);
_scanner_params.emplace("time_zone", _state->timezone_obj().name());
RETURN_IF_ERROR(_init_jni_scanner(env, batch_size));
// Call org.apache.doris.common.jni.JniScanner#open
env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TimeStampSecTZVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarBinaryVector;
Expand Down Expand Up @@ -61,8 +58,8 @@
public class MaxComputeColumnValue implements ColumnValue {
private static final Logger LOG = Logger.getLogger(MaxComputeColumnValue.class);
private int idx;
private int offset = 0; // for complex type
private ValueVector column;
private ZoneId timeZone;

public MaxComputeColumnValue() {
idx = 0;
Expand All @@ -77,10 +74,19 @@ public MaxComputeColumnValue(ValueVector valueVector, int i) {
this.idx = i;
}

public MaxComputeColumnValue(ValueVector valueVector, int i, ZoneId timeZone) {
this.column = valueVector;
this.idx = i;
this.timeZone = timeZone;
}

public void reset(ValueVector column) {
this.column = column;
this.idx = 0;
this.offset = 0;
}

public void setTimeZone(ZoneId timeZone) {
this.timeZone = timeZone;
}

@Override
Expand Down Expand Up @@ -283,9 +289,10 @@ public byte[] getBytes() {
@Override
public void unpackArray(List<ColumnValue> values) {
ListVector listCol = (ListVector) column;
int elemSize = listCol.getObject(idx).size();
int elemSize = listCol.getElementEndIndex(idx) - listCol.getElementStartIndex(idx);
int offset = listCol.getElementStartIndex(idx);
for (int i = 0; i < elemSize; i++) {
MaxComputeColumnValue val = new MaxComputeColumnValue(listCol.getDataVector(), offset);
MaxComputeColumnValue val = new MaxComputeColumnValue(listCol.getDataVector(), offset, timeZone);
values.add(val);
offset++;
}
Expand All @@ -295,13 +302,14 @@ public void unpackArray(List<ColumnValue> values) {
public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
MapVector mapCol = (MapVector) column;
int elemSize = mapCol.getElementEndIndex(idx) - mapCol.getElementStartIndex(idx);
int offset = mapCol.getElementStartIndex(idx);
List<FieldVector> innerCols = ((StructVector) mapCol.getDataVector()).getChildrenFromFields();
FieldVector keyList = innerCols.get(0);
FieldVector valList = innerCols.get(1);
for (int i = 0; i < elemSize; i++) {
MaxComputeColumnValue key = new MaxComputeColumnValue(keyList, offset);
MaxComputeColumnValue key = new MaxComputeColumnValue(keyList, offset, timeZone);
keys.add(key);
MaxComputeColumnValue val = new MaxComputeColumnValue(valList, offset);
MaxComputeColumnValue val = new MaxComputeColumnValue(valList, offset, timeZone);
values.add(val);
offset++;
}
Expand All @@ -312,32 +320,14 @@ public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> value
StructVector structCol = (StructVector) column;
List<FieldVector> innerCols = structCol.getChildrenFromFields();
for (Integer fieldIndex : structFieldIndex) {
MaxComputeColumnValue val = new MaxComputeColumnValue(innerCols.get(fieldIndex), idx);
MaxComputeColumnValue val = new MaxComputeColumnValue(innerCols.get(fieldIndex), idx, timeZone);
values.add(val);
}
}

public static LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector, int index) {
public LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector, int index) {
long timestampMillis = milliTZVector.get(index);
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), ZoneId.systemDefault());
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), timeZone);
}

public static LocalDateTime convertToLocalDateTime(TimeStampNanoTZVector nanoTZVector, int index) {
long timestampNanos = nanoTZVector.get(index);
return LocalDateTime.ofInstant(Instant.ofEpochSecond(timestampNanos / 1_000_000_000,
timestampNanos % 1_000_000_000), ZoneId.systemDefault());
}

public static LocalDateTime convertToLocalDateTime(TimeStampSecTZVector secTZVector, int index) {
long timestampSeconds = secTZVector.get(index);
return LocalDateTime.ofInstant(Instant.ofEpochSecond(timestampSeconds), ZoneId.systemDefault());
}

public static LocalDateTime convertToLocalDateTime(TimeStampMicroTZVector microTZVector, int index) {
long timestampMicros = microTZVector.get(index);
long seconds = timestampMicros / 1_000_000;
long nanos = (timestampMicros % 1_000_000) * 1_000;

return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds, nanos), ZoneId.systemDefault());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.time.ZoneId;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
Expand All @@ -64,7 +65,7 @@ public class MaxComputeJniScanner extends JniScanner {
private static final String SPLIT_SIZE = "split_size";
private static final String SESSION_ID = "session_id";
private static final String SCAN_SERIALIZER = "scan_serializer";

private static final String TIME_ZONE = "time_zone";

private enum SplitType {
BYTE_SIZE,
Expand All @@ -86,7 +87,7 @@ private enum SplitType {
private long startOffset = -1L;
private long splitSize = -1L;
public EnvironmentSettings settings;

public ZoneId timeZone;

public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
String[] requiredFields = params.get("required_fields").split(",");
Expand Down Expand Up @@ -117,6 +118,13 @@ public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
project = Objects.requireNonNull(params.get(PROJECT), "required property '" + PROJECT + "'.");
table = Objects.requireNonNull(params.get(TABLE), "required property '" + TABLE + "'.");
sessionId = Objects.requireNonNull(params.get(SESSION_ID), "required property '" + SESSION_ID + "'.");
String timeZoneName = Objects.requireNonNull(params.get(TIME_ZONE), "required property '" + TIME_ZONE + "'.");
try {
timeZone = ZoneId.of(timeZoneName);
} catch (Exception e) {
LOG.warn(e.getMessage() + " Set timeZoneName = " + timeZoneName + "fail, use systemDefault.");
timeZone = ZoneId.systemDefault();
}


Account account = new AliyunAccount(accessKey, secretKey);
Expand Down Expand Up @@ -172,7 +180,7 @@ public void open() throws IOException {
LOG.info("createArrowReader failed.", e);
} catch (Exception e) {
close();
throw new IOException(e);
throw new IOException(e.getMessage(), e);
}
}

Expand All @@ -192,6 +200,7 @@ protected int getNext() throws IOException {
return 0;
}
columnValue = new MaxComputeColumnValue();
columnValue.setTimeZone(timeZone);
int expectedRows = batchSize;
return readVectors(expectedRows);
}
Expand Down
Loading

0 comments on commit 3982657

Please sign in to comment.