Skip to content

Commit

Permalink
fix review
Browse files Browse the repository at this point in the history
  • Loading branch information
loneylee committed Feb 10, 2025
1 parent 5d51ac4 commit 53f1ec6
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
1 change: 1 addition & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/stats/*.scala</exclude>
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/Snapshot.scala</exclude>
<exclude>src-delta-${delta.binary.version}/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala</exclude>
</excludes>
</scala>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,18 +267,17 @@ class ClickhouseOptimisticTransaction(
// TODO: val checkInvariants = DeltaInvariantCheckerExec(empty2NullPlan, constraints)
val checkInvariants = empty2NullPlan

// TODO: DeltaOptimizedWriterExec
// No need to plan optimized write if the write command is OPTIMIZE, which aims to produce
// evenly-balanced data files already.
val physicalPlan =
if (
!isOptimize &&
if (
!isOptimize &&
shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
) {
DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog)
} else {
checkInvariants
}
) {
DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, deltaLog)
} else {
checkInvariants
}

val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()

Expand Down Expand Up @@ -340,14 +339,14 @@ class ClickhouseOptimisticTransaction(

var resultFiles =
(if (optionalStatsTracker.isDefined) {
committer.addedStatuses.map {
a =>
a.copy(stats =
optionalStatsTracker.map(_.recordedStats(a.toPath.getName)).getOrElse(a.stats))
}
} else {
committer.addedStatuses
})
committer.addedStatuses.map { a =>
a.copy(stats = optionalStatsTracker.map(
_.recordedStats(a.toPath.getName)).getOrElse(a.stats))
}
}
else {
committer.addedStatuses
})
.filter {
// In some cases, we can write out an empty `inputData`. Some examples of this (though, they
// may be fixed in the future) are the MERGE command when you delete with empty source, or
Expand Down

0 comments on commit 53f1ec6

Please sign in to comment.