From 4e07c38943265cf1f6004cf7656837ae4d013b05 Mon Sep 17 00:00:00 2001 From: gnehil Date: Thu, 25 Jan 2024 15:44:53 +0800 Subject: [PATCH] update spark version for spark load to resolve cve problem --- fe/pom.xml | 2 +- .../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 11 +++-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/fe/pom.xml b/fe/pom.xml index e0018b8c1816c87..ff191b1e6694436 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -292,7 +292,7 @@ under the License. 1.2.0 2.3.0 0.8.13 - 2.4.6 + 3.1.3 3.1.3 2.3.9 9.35 diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index 33ca13cb0e43138..a5b3e33ab7d2b54 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -47,8 +47,6 @@ import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; -import org.apache.spark.sql.catalyst.encoders.RowEncoder; import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; @@ -190,7 +188,6 @@ private void writeRepartitionAndSortedRDDToParquet(JavaPairRDD, Obj // TODO(wb) should deal largeint as BigInteger instead of string when using biginteger as key, // data type may affect sorting logic StructType dstSchema = DppUtils.createDstTableSchema(indexMeta.columns, false, true); - ExpressionEncoder encoder = RowEncoder.apply(dstSchema); resultRDD.repartitionAndSortWithinPartitions(new BucketPartitioner(bucketKeyMap), new BucketComparator()) .foreachPartition((VoidFunction, Object[]>>>) t -> { @@ -254,15 +251,13 @@ private void writeRepartitionAndSortedRDDToParquet(JavaPairRDD, Obj conf.set("spark.sql.parquet.outputTimestampType", "INT96"); ParquetWriteSupport.setSchema(dstSchema, conf); ParquetWriteSupport parquetWriteSupport = new ParquetWriteSupport(); - parquetWriter = new ParquetWriter(new Path(tmpPath), parquetWriteSupport, + parquetWriter = new ParquetWriter<>(new Path(tmpPath), parquetWriteSupport, CompressionCodecName.SNAPPY, 256 * 1024 * 1024, 16 * 1024, 1024 * 1024, true, false, WriterVersion.PARQUET_1_0, conf); - if (parquetWriter != null) { - LOG.info("[HdfsOperate]>> initialize writer succeed! path:" + tmpPath); - } + LOG.info("[HdfsOperate]>> initialize writer succeed! path:" + tmpPath); lastBucketKey = curBucketKey; } - InternalRow internalRow = encoder.toRow(rowWithoutBucketKey); + InternalRow internalRow = InternalRow.apply(rowWithoutBucketKey.toSeq()); parquetWriter.write(internalRow); } if (parquetWriter != null) {