Skip to content

Commit

Permalink
[SPARK-20331][SQL][FOLLOW-UP] Add a SQLConf for enhanced Hive partiti…
Browse files Browse the repository at this point in the history
…on pruning predicate pushdown

## What changes were proposed in this pull request?
This is a follow-up PR of #17633.

This PR is to add a conf `spark.sql.hive.advancedPartitionPredicatePushdown.enabled`, which can be used to turn the enhancement off.

## How was this patch tested?
Add a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19547 from gatorsmile/Spark20331FollowUp.
  • Loading branch information
gatorsmile committed Oct 21, 2017
1 parent d9f286d commit d8cada8
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ object SQLConf {
.intConf
.createWithDefault(4)

val ADVANCED_PARTITION_PREDICATE_PUSHDOWN =
buildConf("spark.sql.hive.advancedPartitionPredicatePushdown.enabled")
.internal()
.doc("When true, advanced partition predicate pushdown into Hive metastore is enabled.")
.booleanConf
.createWithDefault(true)

val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS =
buildConf("spark.sql.statistics.fallBackToHdfs")
.doc("If the table statistics are not available from table metadata enable fall back to hdfs." +
Expand Down Expand Up @@ -1092,6 +1099,9 @@ class SQLConf extends Serializable with Logging {

def limitScaleUpFactor: Int = getConf(LIMIT_SCALE_UP_FACTOR)

def advancedPartitionPredicatePushdownEnabled: Boolean =
getConf(ADVANCED_PARTITION_PREDICATE_PUSHDOWN)

def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)

def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,35 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
* Unsupported predicates are skipped.
*/
def convertFilters(table: Table, filters: Seq[Expression]): String = {
if (SQLConf.get.advancedPartitionPredicatePushdownEnabled) {
convertComplexFilters(table, filters)
} else {
convertBasicFilters(table, filters)
}
}

private def convertBasicFilters(table: Table, filters: Seq[Expression]): String = {
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
lazy val varcharKeys = table.getPartitionKeys.asScala
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
.map(col => col.getName).toSet

filters.collect {
case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) =>
s"${a.name} ${op.symbol} $v"
case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) =>
s"$v ${op.symbol} ${a.name}"
case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType))
if !varcharKeys.contains(a.name) =>
s"""${a.name} ${op.symbol} ${quoteStringLiteral(v.toString)}"""
case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute)
if !varcharKeys.contains(a.name) =>
s"""${quoteStringLiteral(v.toString)} ${op.symbol} ${a.name}"""
}.mkString(" and ")
}

private def convertComplexFilters(table: Table, filters: Seq[Expression]): String = {
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
lazy val varcharKeys = table.getPartitionKeys.asScala
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
* A set of tests for the filter conversion logic used when pushing partition pruning into the
* metastore
*/
class FiltersSuite extends SparkFunSuite with Logging {
class FiltersSuite extends SparkFunSuite with Logging with PlanTest {
private val shim = new Shim_v0_13

private val testTable = new org.apache.hadoop.hive.ql.metadata.Table("default", "test")
Expand Down Expand Up @@ -72,10 +74,28 @@ class FiltersSuite extends SparkFunSuite with Logging {

private def filterTest(name: String, filters: Seq[Expression], result: String) = {
test(name) {
val converted = shim.convertFilters(testTable, filters)
if (converted != result) {
fail(
s"Expected filters ${filters.mkString(",")} to convert to '$result' but got '$converted'")
withSQLConf(SQLConf.ADVANCED_PARTITION_PREDICATE_PUSHDOWN.key -> "true") {
val converted = shim.convertFilters(testTable, filters)
if (converted != result) {
fail(s"Expected ${filters.mkString(",")} to convert to '$result' but got '$converted'")
}
}
}
}

test("turn on/off ADVANCED_PARTITION_PREDICATE_PUSHDOWN") {
import org.apache.spark.sql.catalyst.dsl.expressions._
Seq(true, false).foreach { enabled =>
withSQLConf(SQLConf.ADVANCED_PARTITION_PREDICATE_PUSHDOWN.key -> enabled.toString) {
val filters =
(Literal(1) === a("intcol", IntegerType) ||
Literal(2) === a("intcol", IntegerType)) :: Nil
val converted = shim.convertFilters(testTable, filters)
if (enabled) {
assert(converted == "(1 = intcol or 2 = intcol)")
} else {
assert(converted.isEmpty)
}
}
}
}
Expand Down

0 comments on commit d8cada8

Please sign in to comment.