Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-207] Fix NaN in Max and Min (#495)
Browse files Browse the repository at this point in the history
* fix NaN in Min and Max

* support NaN checking in Min/Max with GroupBy

* pass NaN check to native

* support convertTimestampUnit and micros_to_timestamp

* format

* refine

* add back plan checking tests
  • Loading branch information
rui-mo authored Oct 25, 2021
1 parent 0ed2b92 commit 996f01c
Show file tree
Hide file tree
Showing 15 changed files with 801 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class ColumnarHashAggregation(

var inputAttrQueue: scala.collection.mutable.Queue[Attribute] = _
val resultType = CodeGeneration.getResultType()
val NaN_check : Boolean = GazellePluginConfig.getConf.enableColumnarNaNCheck

def getColumnarFuncNode(expr: Expression): TreeNode = {
try {
Expand Down Expand Up @@ -181,7 +182,8 @@ class ColumnarHashAggregation(
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
TreeBuilder.makeFunction("action_max", childrenColumnarFuncNodeList.asJava, resultType)
val actionName = "action_max" + s"_$NaN_check"
TreeBuilder.makeFunction(actionName, childrenColumnarFuncNodeList.asJava, resultType)
case Min(_) =>
val childrenColumnarFuncNodeList =
mode match {
Expand All @@ -192,7 +194,8 @@ class ColumnarHashAggregation(
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
TreeBuilder.makeFunction("action_min", childrenColumnarFuncNodeList.asJava, resultType)
val actionName = "action_min" + s"_$NaN_check"
TreeBuilder.makeFunction(actionName, childrenColumnarFuncNodeList.asJava, resultType)
case StddevSamp(_, _) =>
mode match {
case Partial =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ case class ColumnarCollapseCodegenStages(
plan: ColumnarConditionProjectExec,
skip_smj: Boolean = false): SparkPlan = plan.child match {
case p: ColumnarBroadcastHashJoinExec
if plan.condition == null && !containsExpression(plan.projectList) =>
if plan.condition == null && plan.projectList == null =>
ColumnarBroadcastHashJoinExec(
p.leftKeys,
p.rightKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ import java.io.FileInputStream
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkConf, SparkFunSuite}

class ArrowColumnarBatchSerializerSuite extends SparkFunSuite with SharedSparkSession {

protected var avgBatchNumRows: SQLMetric = _
protected var outputNumRows: SQLMetric = _

override def sparkConf: SparkConf =
super.sparkConf
.set("spark.shuffle.compress", "false")

override def beforeEach() = {
avgBatchNumRows = SQLMetrics.createAverageMetric(
spark.sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import java.net.URI
import java.nio.file.{Files, StandardOpenOption}
import java.util.Locale

import scala.collection.mutable
import com.intel.oap.execution.{ColumnarBroadcastHashJoinExec, ColumnarSortMergeJoinExec}

import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{LocalFileSystem, Path}

import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT}
Expand Down Expand Up @@ -708,21 +708,21 @@ class FileBasedDataSourceSuite extends QueryTest
val joinedDF = df1FromFile.join(df2FromFile, Seq("count"))
if (compressionFactor == 0.5) {
val bJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case bJoin: BroadcastHashJoinExec => bJoin
case bJoin: ColumnarBroadcastHashJoinExec => bJoin
}
assert(bJoinExec.nonEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
case smJoin: ColumnarSortMergeJoinExec => smJoin
}
assert(smJoinExec.isEmpty)
} else {
// compressionFactor is 1.0
val bJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case bJoin: BroadcastHashJoinExec => bJoin
case bJoin: ColumnarBroadcastHashJoinExec => bJoin
}
assert(bJoinExec.isEmpty)
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
case smJoin: SortMergeJoinExec => smJoin
case smJoin: ColumnarSortMergeJoinExec => smJoin
}
assert(smJoinExec.nonEmpty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.sql

import java.util.Locale

import com.intel.oap.execution.{ColumnarSortExec, ColumnarSortMergeJoinExec}

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

import org.mockito.Mockito._

import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand Down Expand Up @@ -866,14 +866,14 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
val executed = df.queryExecution.executedPlan
val executedJoins = collect(executed) {
case j: SortMergeJoinExec => j
case j: ColumnarSortMergeJoinExec => j
}
// This only applies to the above tested queries, in which a child SortMergeJoin always
// contains the SortOrder required by its parent SortMergeJoin. Thus, SortExec should never
// appear as parent of SortMergeJoin.
executed.foreach {
case s: SortExec => s.foreach {
case j: SortMergeJoinExec => fail(
case s: ColumnarSortExec => s.foreach {
case j: ColumnarSortMergeJoinExec => fail(
s"No extra sort should be added since $j already satisfies the required ordering"
)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}

test("SPARK-23957 Remove redundant sort from subquery plan(scalar subquery)") {
ignore("SPARK-23957 Remove redundant sort from subquery plan(scalar subquery)") {
withTempView("t1", "t2", "t3") {
Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
Expand Down
Loading

0 comments on commit 996f01c

Please sign in to comment.