Skip to content

Commit

Permalink
support ingestion load from s3
Browse files Browse the repository at this point in the history
  • Loading branch information
gnehil committed Feb 17, 2025
1 parent 03f1ac1 commit 1426025
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -870,12 +870,12 @@ public Object updateIngestionLoad(HttpServletRequest request, HttpServletRespons
String fullDbName = getFullDbName(db);

long loadId = -1;
LoadJob loadJob = null;
try {

String body = HttpUtils.getBody(request);
JsonMapper mapper = JsonMapper.builder().build();
JsonNode jsonNode = mapper.readTree(body);
LoadJob loadJob = null;

if (jsonNode.hasNonNull("loadId")) {
loadId = jsonNode.get("loadId").asLong();
Expand All @@ -897,6 +897,10 @@ public Object updateIngestionLoad(HttpServletRequest request, HttpServletRespons
ingestionLoadJob.updateJobStatus(statusInfo);
} catch (IOException | MetaNotFoundException | UnauthorizedException e) {
LOG.warn("cancel ingestion load job failed, db: {}, load id: {}, err: {}", db, loadId, e.getMessage());
if (loadJob != null) {
loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()),
true, true);
}
return ResponseEntityBuilder.okWithCommonError(e.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ public class IngestionLoadJob extends LoadJob {

private List<Long> loadTableIds = new ArrayList<>();

@SerializedName(value = "ftype")
private TFileType fileType;

public IngestionLoadJob() {
super(EtlJobType.INGESTION);
}
Expand Down Expand Up @@ -683,6 +686,16 @@ public void updateJobStatus(Map<String, String> statusInfo) {
hadoopProperties.putAll(
gson.fromJson(statusInfo.get("hadoopProperties"), new TypeToken<HashMap<String, String>>() {
}));
switch (statusInfo.get("storageType")) {
case "HDFS":
fileType = TFileType.FILE_HDFS;
break;
case "S3":
fileType = TFileType.FILE_S3;
break;
default:
throw new IllegalArgumentException("unknown file type: " + statusInfo.get("fileType"));
}
}

}
Expand Down Expand Up @@ -825,7 +838,7 @@ private TBrokerScanRange getTBrokerScanRange(DescriptorTable descTable, TupleDes

// broker range desc
TBrokerRangeDesc tBrokerRangeDesc = new TBrokerRangeDesc();
tBrokerRangeDesc.setFileType(TFileType.FILE_HDFS);
tBrokerRangeDesc.setFileType(fileType);
tBrokerRangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET);
tBrokerRangeDesc.setSplittable(false);
tBrokerRangeDesc.setStartOffset(0);
Expand Down Expand Up @@ -884,7 +897,7 @@ private PushTask buildPushTask(long backendId, OlapTable olapTable, long taskSig
getTBrokerScanRange(descTable, destTupleDesc, columns, hadoopProperties);
// update filePath fileSize
TBrokerRangeDesc tBrokerRangeDesc = tBrokerScanRange.getRanges().get(0);
tBrokerRangeDesc.setFileType(TFileType.FILE_HDFS);
tBrokerRangeDesc.setFileType(fileType);
tBrokerRangeDesc.setPath("");
tBrokerRangeDesc.setFileSize(-1);
String tabletMetaStr = String.format("%d.%d.%d.%d.%d", olapTable.getId(), partitionId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
-2067215761 41V4fgCKI3Hkioe DZGaq24Yqh7SwmbPT6IX23jfC5NKqG7gE9JT4GLwiaQtoO8l6EjhGWQP9X7NHmjdqMbIN5kNeDkffOrlS6roIwj2wXpJ true 123 -4524 1014223769452772206 -6589390612399245616 29103.387 -4.09385322835896E8 886.394 -699.960 2024-02-22 2024-03-04 2024-01-01T22:13:54 2023-10-02T19:35:10
-1982022740 fcZ1o6ZXG8UOFh5 iw4Ziys42GRRTFNkVPeQEA9I5EQtBD04xfefDsPCWN0vr1 true -3 -25143 7122614823242399351 7391807634038366248 23160.604 -9.20283206353984E8 829.880 -387.403 2023-10-16 2024-05-13 2024-07-16T18:27:45 2023-11-03T05:30:21
-1228926576 1TISaGtB01BiqVt kq4u false -123 15962 3590007830423934951 -1478759439092857810 -7813.757 -6.98785793100899E8 930.743 402.350 2024-07-23 2023-07-30 2023-11-27T17:48:50 2024-03-11T21:09:58
-575060895 rRfatUJiOO5dq9Y ETjqrUNUnI5kSmkafjWfRTG8HIp98pLGXagNpXZHqOIZZDRkoGeahOwk9 false 16 -767 6623730208927396375 -3055706367894284822 12540.839 -1.047911096098831E9 -752.454 -241.620 2024-04-10 2024-05-16 2023-12-07T23:38:05 2023-12-11T05:48:36
-76042627 PcVaKC43qmIzuxY U3aGxaZumFpqcUsLI true 44 31151 9085406701767055602 -5846138572199996843 -16845.29 2.44522690225531E8 -784.720 -467.133 2023-10-31 2023-08-29 2023-09-12T10:12:46 2023-10-19T17:02:51
121048200 KPLWjhhbGXqflJi rzqOYQH9ySHPwCm5K4GdeuI28G8LLmnpqLmsLMLfyRIvcfrlubQI47wUa8QILhuS38MBkjL true 42 13182 -6601530758880565531 5619594098883737912 -2782.1506 3.86698722676211E8 478.420 -330.289 2024-06-17 2023-12-26 2024-04-28T03:29:04 2023-08-18T21:05:32
262860291 m3XgmlbIHYNH1qS BTJRzVrpM78zJAsHMEGhkF5BiDoc3yJuoV0s209sFcqElZsheBgolBGlFl9X4EfauD64FcFF2Mi4V0dKZfpDgaLLRPfG1SALV7 false -42 5990 -7504815416577235660 1535659792778122944 1171.9619 1.28834143701229E8 626.721 682.828 2023-11-24 2023-11-18 2024-03-21T11:50:17 2024-03-31T12:59:27
579428879 KsOC6WGrieGlo7B SzeA6tRbsiGWJTBDvBQdBjCqjSE6Y false -111 32758 4029182463831656671 -3546198025044093789 20338.55 -2.015222388533773E9 61.981 720.310 2023-11-13 2024-07-04 2024-07-19T12:42:28 2024-01-04T10:32:53
1145092610 xWJUDWAV8Nllo0F dnZ9RMVdoqxh4kGBvy55zQdChNTVYdlvRZP4aWIkXyErUbM1XmFGQ9vuCD113JKKCyx4crDoY false 115 -22832 -7242855305248390982 -4240353246453053617 -9074.909 -2.51212400295869E8 -502.410 618.820 2024-06-12 2024-04-18 2023-11-04T09:55:17 2023-11-13T16:30:23
1736373707 UU14wnLhPkBid41 pmuNqYfOc3JCscf9meT5dYB2i28Pt9iaeXK4QqjVZJdoKFOeZI5bG9RKm1zInTdDMW1N0PKI5Y true -105 -20276 360048259532857165 -4602633478165721463 -13230.296 -1.708246954394742E9 757.147 -533.800 2024-01-05 2023-09-08 2023-11-27T05:21:33 2024-02-11T21:35:03

39 changes: 39 additions & 0 deletions regression-test/plugins/aliyun_oss_sdk.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.model.CopyObjectRequest;
import com.aliyun.oss.model.CopyObjectResult;
import com.aliyun.oss.model.DeleteObjectsRequest;
import com.aliyun.oss.model.DeleteObjectsResult;
import com.aliyun.oss.model.ListObjectsRequest;
Expand Down Expand Up @@ -170,3 +172,40 @@ Suite.metaClass.getOssAllDirSizeWithPrefix = { OSS client, String bucketName, St
logger.info("Done!")
}
}

Suite.metaClass.copyObject = { OSS client, String bucketName, String srcObject, String dstObject="" ->

try {

if (!client.doesBucketExist(bucketName)) {
logger.info("no bucket named ${bucketName} in ${endpoint}")
return false;
}

// 创建CopyObjectRequest对象
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName, srcObject, bucketName, dstObject);

// 执行复制操作
CopyObjectResult result = client.copyObject(copyObjectRequest);

// 输出结果
logger.info("ETag: " + result.getETag());
logger.info("LastModified: " + result.getLastModified());
} catch (OSSException oe) {
logger.error("Caught an OSSException, which means your request made it to OSS, but was rejected with an error response for some reason.");
logger.error("Error Message:" + oe.getErrorMessage());
logger.error("Error Code:" + oe.getErrorCode());
logger.error("Request ID:" + oe.getRequestId());
logger.error("Host ID:" + oe.getHostId());
return false;
} catch (ClientException ce) {
logger.error("Caught an ClientException, which means the client encountered a serious internal problem while trying to communicate with OSS, such as not being able to access the network.");
logger.error("Error Message:" + ce.getMessage());
return false;
} finally {
logger.info("Done!")
}

return true;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import java.nio.file.Files
import java.nio.file.Paths
import java.nio.file.StandardCopyOption

suite('test_ingestion_load_s3', 'p0') {

def testIngestLoadJob = { testTable, loadLabel ->

sql "TRUNCATE TABLE ${testTable}"

sql "CLEAN LABEL FROM ${context.dbName}"

long loadId = -1
long tableId = -1
long partitionId = -1
long indexId = -1
long bucketId = 0
Integer schemaHash = -1

String reqBody =
"""{
"label": "${loadLabel}",
"tableToPartition": {
"${testTable}": []
},
"properties": {}
}"""

httpTest {
endpoint context.config.feHttpAddress
uri "/api/ingestion_load/internal/${context.dbName}/_create"
op "post"
basicAuthorization context.config.feHttpUser, context.config.feHttpPassword
body reqBody
check { code, resBody ->
assert code == 200
def resBodyJson = parseJson(resBody)
assert resBodyJson instanceof Map
assert resBodyJson.code == 0
def data = resBodyJson.data
loadId = data.loadId
def tableMeta = data.tableMeta
tableId = tableMeta["${testTable}"].id
def index = tableMeta["${testTable}"].indexes[0]
indexId = index.indexId
schemaHash = index.schemaHash
partitionId = tableMeta["${testTable}"].partitionInfo.partitions[0].partitionId
}
}

String resultFileName = "V1.${loadLabel}.${tableId}.${partitionId}.${indexId}.${bucketId}.${schemaHash}.parquet"
logger.info("resultFileName: " + resultFileName)

String ak = getS3AK()
String sk = getS3SK()
String s3_endpoint = getS3Endpoint()
String bucket = context.config.otherConfigs.get("s3BucketName");

def ossClient = initOssClient(ak, sk, s3_endpoint)
def res = copyObject(ossClient, bucket, "regression/load_p0/ingestion_load/data.parquet", "regression/load_p0/ingestion_load/${resultFileName}")
assert res

String dppResult = '{\\"isSuccess\\":true,\\"failedReason\\":\\"\\",\\"scannedRows\\":10,\\"fileNumber\\":1,' +
'\\"fileSize\\":5745,\\"normalRows\\":10,\\"abnormalRows\\":0,\\"unselectRows\\":0,' +
'\\"partialAbnormalRows\\":\\"[]\\",\\"scannedBytes\\":0}'

String etlResultFilePath = "s3://${bucket}/regression/load_p0/ingestion_load/${resultFileName}"

String updateStatusReqBody =
"""{
"loadId": ${loadId},
"statusInfo": {
"status": "SUCCESS",
"msg": "",
"appId": "",
"dppResult": "${dppResult}",
"filePathToSize": "{\\"${etlResultFilePath}\\": 5745}",
"hadoopProperties": "{\\"AWS_ENDPOINT\\":\\"${s3_endpoint}\\",\\"AWS_ACCESS_KEY\\":\\"${ak}\\",\\"AWS_SECRET_KEY\\":\\"${sk}\\", \\"AWS_REGION\\":\\"${getS3Region()}\\"}",
"storageType":"S3"
}
}"""

httpTest {
endpoint context.config.feHttpAddress
uri "/api/ingestion_load/internal/${context.dbName}/_update"
op "post"
basicAuthorization context.config.feHttpUser, context.config.feHttpPassword
body updateStatusReqBody
check { code, resBody ->
{
assert code == 200
def resBodyJson = parseJson(resBody)
assert resBodyJson instanceof Map
assert resBodyJson.code == 0
}
}
}

def max_try_milli_secs = 120000
while (max_try_milli_secs) {
def result = sql "show load where label = '${loadLabel}'"
if (result[0][2] == "FINISHED") {
sql "sync"
qt_select "select * from ${testTable} order by 1"
break
} else {
sleep(5000) // wait 1 second every time
max_try_milli_secs -= 5000
if (max_try_milli_secs <= 0) {
assertEquals(1, 2)
}
}
}

}

def tableName = 'tbl_test_spark_load'

sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
c_int int(11) NULL,
c_char char(15) NULL,
c_varchar varchar(100) NULL,
c_bool boolean NULL,
c_tinyint tinyint(4) NULL,
c_smallint smallint(6) NULL,
c_bigint bigint(20) NULL,
c_largeint largeint(40) NULL,
c_float float NULL,
c_double double NULL,
c_decimal decimal(6, 3) NULL,
c_decimalv3 decimal(6, 3) NULL,
c_date date NULL,
c_datev2 date NULL,
c_datetime datetime NULL,
c_datetimev2 datetime NULL
)
DUPLICATE KEY(c_int)
DISTRIBUTED BY HASH(c_int) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
)
"""

def label = "test_ingestion_load_s3"

testIngestLoadJob.call(tableName, label)

}

0 comments on commit 1426025

Please sign in to comment.