Skip to content

Commit

Permalink
[CORE-8569][CH] Support DeltaOptimizedWriterTransformer (#8570)
Browse files Browse the repository at this point in the history
* [CORE-8569][CH] Support DeltaOptimizedWriterTransformer
  • Loading branch information
loneylee authored Feb 11, 2025
1 parent 8df00be commit 62d9696
Show file tree
Hide file tree
Showing 27 changed files with 829 additions and 14 deletions.
39 changes: 32 additions & 7 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,28 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>delta</id>
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-delta</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-delta</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>${delta.package.name}_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>

<dependencies>
Expand Down Expand Up @@ -464,15 +486,18 @@
<includes>
<include>src/main/scala/**/*.scala</include>
<include>src/test/scala/**/*.scala</include>
<include>src-delta-${delta.binary.version}/main/delta/**/*.scala</include>
<include>src-delta-${delta.binary.version}/test/delta/**/*.scala</include>
<include>src-delta/main/scala/**/*.scala</include>
<include>src-delta/test/scala/**/*.scala</include>
<include>src-delta-${delta.binary.version}/main/scala/**/*.scala</include>
<include>src-delta-${delta.binary.version}/test/scala/**/*.scala</include>
</includes>
<excludes>
<exclude>src-delta-${delta.binary.version}/main/delta/org/apache/spark/sql/delta/commands/*.scala</exclude>
<exclude>src-delta-${delta.binary.version}/main/delta/org/apache/spark/sql/delta/commands/merge/*.scala</exclude>
<exclude>src-delta-${delta.binary.version}/main/delta/org/apache/spark/sql/delta/stats/*.scala</exclude>
<exclude>src-delta-${delta.binary.version}/main/delta/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
<exclude>src-delta-${delta.binary.version}/main/delta/org/apache/spark/sql/delta/Snapshot.scala</exclude>
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/commands/*.scala</exclude>
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/commands/merge/*.scala</exclude>
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/stats/*.scala</exclude>
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/Snapshot.scala</exclude>
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala</exclude>
</excludes>
</scala>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.gluten.sql.shims.delta20.Delta20ShimProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.sql.shims.delta20

import org.apache.gluten.sql.shims.{DeltaShimProvider, DeltaShims}

class Delta20ShimProvider extends DeltaShimProvider {

override def createShim: DeltaShims = {
new Delta20Shims()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.sql.shims.delta20

import org.apache.gluten.sql.shims.DeltaShims

class Delta20Shims extends DeltaShims {}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDi
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.collection.BitSet

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.types.StructType

import java.{util => ju}

import scala.collection.JavaConverters._

@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.gluten.sql.shims.delta23.Delta23ShimProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.sql.shims.delta23

import org.apache.gluten.sql.shims.{DeltaShimProvider, DeltaShims}

class Delta23ShimProvider extends DeltaShimProvider {

override def createShim: DeltaShims = {
new Delta23Shims()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.sql.shims.delta23

import org.apache.gluten.sql.shims.DeltaShims

class Delta23Shims extends DeltaShims {}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDi
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.collection.BitSet

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.types.StructType

import java.{util => ju}

import scala.collection.JavaConverters._

@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.gluten.sql.shims.delta32.Delta32ShimProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.sql.shims.delta32

import org.apache.gluten.sql.shims.{DeltaShimProvider, DeltaShims}

class Delta32ShimProvider extends DeltaShimProvider {

override def createShim: DeltaShims = {
new Delta32Shims()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.sql.shims.delta32

import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.sql.shims.DeltaShims

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.perf.DeltaOptimizedWriterTransformer

class Delta32Shims extends DeltaShims {
override def supportDeltaOptimizedWriterExec(plan: SparkPlan): Boolean =
DeltaOptimizedWriterTransformer.support(plan)

override def offloadDeltaOptimizedWriterExec(plan: SparkPlan): GlutenPlan = {
DeltaOptimizedWriterTransformer.from(plan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ class ClickhouseOptimisticTransaction(
// TODO: val checkInvariants = DeltaInvariantCheckerExec(empty2NullPlan, constraints)
val checkInvariants = empty2NullPlan

// TODO: DeltaOptimizedWriterExec
// No need to plan optimized write if the write command is OPTIMIZE, which aims to produce
// evenly-balanced data files already.
val physicalPlan =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreePart
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.types.StructType

import java.{util => ju}

import scala.collection.JavaConverters._

@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.clickhouse.RuntimeSettings
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.vectorized.NativeExpressionEvaluator
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -30,6 +30,8 @@ import org.apache.spark.sql.delta.stats.{DeltaFileStatistics, DeltaJobStatistics
import org.apache.spark.sql.execution.datasources.{ExecutedWriteSummary, WriteJobDescription, WriteTaskResult}
import org.apache.spark.util.Utils

import org.apache.hadoop.fs.Path

import scala.collection.mutable.ArrayBuffer

case class DeltaFileCommitInfo(committer: FileDelayedCommitProtocol)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package org.apache.spark.sql.execution.datasources

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite, GlutenPlan, SortExecTransformer}

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions}
import org.apache.spark.sql.execution.{SortExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

class DeltaV1WritesSuite extends GlutenClickHouseWholeStageTransformerSuite {

Expand All @@ -29,6 +32,14 @@ class DeltaV1WritesSuite extends GlutenClickHouseWholeStageTransformerSuite {
override protected def sparkConf: SparkConf = {
super.sparkConf
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
.set(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseSparkCatalog")
.set("spark.databricks.delta.maxSnapshotLineageLength", "20")
.set("spark.databricks.delta.snapshotPartitions", "1")
.set("spark.databricks.delta.properties.defaults.checkpointInterval", "5")
.set("spark.databricks.delta.stalenessLimit", "3600000")
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
}

override def beforeAll(): Unit = {
Expand Down Expand Up @@ -97,4 +108,44 @@ class DeltaV1WritesSuite extends GlutenClickHouseWholeStageTransformerSuite {
check(df.orderBy("j", "k").queryExecution.sparkPlan)
}

protected def checkResult(df: DataFrame, numFileCheck: Long => Boolean, dir: String): Unit = {
val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, dir)
val files = snapshot.numOfFiles
assert(numFileCheck(files), s"file check failed: received $files")

checkAnswer(
spark.read.format("delta").load(dir),
df
)
}

test("test optimize write") {
withTempDir {
dir =>
val df = sql("select * from t0").toDF()
df.write
.format("delta")
.option(DeltaOptions.OPTIMIZE_WRITE_OPTION, "true")
.save(dir.getPath)
checkResult(df, numFileCheck = _ === 1, dir.getPath)
}
}

test("optimize write - partitioned write") {
withTempDir {
dir =>
val df = spark
.range(0, 100, 1, 4)
.withColumn("part", 'id % 5)

df.write
.partitionBy("part")
.option(DeltaOptions.OPTIMIZE_WRITE_OPTION, "true")
.format("delta")
.save(dir.getPath)

checkResult(df, numFileCheck = _ <= 5, dir.getPath)
}
}

}
Loading

0 comments on commit 62d9696

Please sign in to comment.