diff --git a/fe/pom.xml b/fe/pom.xml
index e0018b8c1816c87..ff191b1e6694436 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -292,7 +292,7 @@ under the License.
1.2.0
2.3.0
0.8.13
- 2.4.6
+ 3.1.3
3.1.3
2.3.9
9.35
diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 33ca13cb0e43138..a5b3e33ab7d2b54 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -47,8 +47,6 @@
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
-import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
@@ -190,7 +188,6 @@ private void writeRepartitionAndSortedRDDToParquet(JavaPairRDD, Obj
// TODO(wb) should deal largeint as BigInteger instead of string when using biginteger as key,
// data type may affect sorting logic
StructType dstSchema = DppUtils.createDstTableSchema(indexMeta.columns, false, true);
- ExpressionEncoder encoder = RowEncoder.apply(dstSchema);
resultRDD.repartitionAndSortWithinPartitions(new BucketPartitioner(bucketKeyMap), new BucketComparator())
.foreachPartition((VoidFunction, Object[]>>>) t -> {
@@ -254,15 +251,13 @@ private void writeRepartitionAndSortedRDDToParquet(JavaPairRDD, Obj
conf.set("spark.sql.parquet.outputTimestampType", "INT96");
ParquetWriteSupport.setSchema(dstSchema, conf);
ParquetWriteSupport parquetWriteSupport = new ParquetWriteSupport();
- parquetWriter = new ParquetWriter(new Path(tmpPath), parquetWriteSupport,
+ parquetWriter = new ParquetWriter<>(new Path(tmpPath), parquetWriteSupport,
CompressionCodecName.SNAPPY, 256 * 1024 * 1024, 16 * 1024, 1024 * 1024, true, false,
WriterVersion.PARQUET_1_0, conf);
- if (parquetWriter != null) {
- LOG.info("[HdfsOperate]>> initialize writer succeed! path:" + tmpPath);
- }
+ LOG.info("[HdfsOperate]>> initialize writer succeed! path:" + tmpPath);
lastBucketKey = curBucketKey;
}
- InternalRow internalRow = encoder.toRow(rowWithoutBucketKey);
+ InternalRow internalRow = InternalRow.apply(rowWithoutBucketKey.toSeq());
parquetWriter.write(internalRow);
}
if (parquetWriter != null) {