Skip to content

Commit

Permalink
Backports apache#3334 to branch-1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Nov 18, 2014
1 parent e4f5695 commit bd17512
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,22 +213,27 @@ private[sql] object ParquetFilters {
Some(createEqualityFilter(right.name, left, p))
case p @ EqualTo(left: NamedExpression, right: Literal) if !left.nullable =>
Some(createEqualityFilter(left.name, right, p))

case p @ LessThan(left: Literal, right: NamedExpression) if !right.nullable =>
Some(createLessThanFilter(right.name, left, p))
Some(createGreaterThanFilter(right.name, left, p))
case p @ LessThan(left: NamedExpression, right: Literal) if !left.nullable =>
Some(createLessThanFilter(left.name, right, p))

case p @ LessThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
Some(createLessThanOrEqualFilter(right.name, left, p))
Some(createGreaterThanOrEqualFilter(right.name, left, p))
case p @ LessThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
Some(createLessThanOrEqualFilter(left.name, right, p))

case p @ GreaterThan(left: Literal, right: NamedExpression) if !right.nullable =>
Some(createGreaterThanFilter(right.name, left, p))
Some(createLessThanFilter(right.name, left, p))
case p @ GreaterThan(left: NamedExpression, right: Literal) if !left.nullable =>
Some(createGreaterThanFilter(left.name, right, p))

case p @ GreaterThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
Some(createGreaterThanOrEqualFilter(right.name, left, p))
Some(createLessThanOrEqualFilter(right.name, left, p))
case p @ GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
Some(createGreaterThanOrEqualFilter(left.name, right, p))

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@

package org.apache.spark.sql.parquet

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}

import parquet.hadoop.ParquetFileWriter
import parquet.hadoop.util.ContextUtil
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser}
import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType}
import org.apache.spark.sql.catalyst.util.getTempFilePath
import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -453,43 +452,46 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}

test("create RecordFilter for simple predicates") {
val attribute1 = new AttributeReference("first", IntegerType, false)()
val predicate1 = new EqualTo(attribute1, new Literal(1, IntegerType))
val filter1 = ParquetFilters.createFilter(predicate1)
assert(filter1.isDefined)
assert(filter1.get.predicate == predicate1, "predicates do not match")
assert(filter1.get.isInstanceOf[ComparisonFilter])
val cmpFilter1 = filter1.get.asInstanceOf[ComparisonFilter]
assert(cmpFilter1.columnName == "first", "column name incorrect")

val predicate2 = new LessThan(attribute1, new Literal(4, IntegerType))
val filter2 = ParquetFilters.createFilter(predicate2)
assert(filter2.isDefined)
assert(filter2.get.predicate == predicate2, "predicates do not match")
assert(filter2.get.isInstanceOf[ComparisonFilter])
val cmpFilter2 = filter2.get.asInstanceOf[ComparisonFilter]
assert(cmpFilter2.columnName == "first", "column name incorrect")

val predicate3 = new And(predicate1, predicate2)
val filter3 = ParquetFilters.createFilter(predicate3)
assert(filter3.isDefined)
assert(filter3.get.predicate == predicate3, "predicates do not match")
assert(filter3.get.isInstanceOf[AndFilter])

val predicate4 = new Or(predicate1, predicate2)
val filter4 = ParquetFilters.createFilter(predicate4)
assert(filter4.isDefined)
assert(filter4.get.predicate == predicate4, "predicates do not match")
assert(filter4.get.isInstanceOf[OrFilter])

val attribute2 = new AttributeReference("second", IntegerType, false)()
val predicate5 = new GreaterThan(attribute1, attribute2)
val badfilter = ParquetFilters.createFilter(predicate5)
assert(badfilter.isDefined === false)

val predicate6 = And(GreaterThan(attribute1, attribute2), GreaterThan(attribute1, attribute2))
val badfilter2 = ParquetFilters.createFilter(predicate6)
assert(badfilter2.isDefined === false)
def checkFilter(predicate: Predicate): Option[CatalystFilter] = {
ParquetFilters.createFilter(predicate).map { f =>
assertResult(predicate)(f.predicate)
f
}.orElse {
fail(s"filter $predicate not pushed down")
}
}

def checkComparisonFilter(predicate: Predicate, columnName: String): Unit = {
assertResult(columnName, "column name incorrect") {
checkFilter(predicate).map(_.asInstanceOf[ComparisonFilter].columnName).get
}
}

def checkInvalidFilter(predicate: Predicate): Unit = {
assert(ParquetFilters.createFilter(predicate).isEmpty)
}

val a = 'a.int.notNull
val b = 'b.int.notNull

checkComparisonFilter(a === 1, "a")
checkComparisonFilter(Literal(1) === a, "a")

checkComparisonFilter(a < 4, "a")
checkComparisonFilter(a > 4, "a")
checkComparisonFilter(a <= 4, "a")
checkComparisonFilter(a >= 4, "a")

checkComparisonFilter(Literal(4) > a, "a")
checkComparisonFilter(Literal(4) < a, "a")
checkComparisonFilter(Literal(4) >= a, "a")
checkComparisonFilter(Literal(4) <= a, "a")

checkFilter(a === 1 && a < 4)
checkFilter(a === 1 || a < 4)

checkInvalidFilter(a > b)
checkInvalidFilter((a > b) && (a > b))
}

test("test filter by predicate pushdown") {
Expand All @@ -516,6 +518,29 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
assert(result2(49)(1) === 199)
}
}
for(myval <- Seq("myint", "mylong", "mydouble", "myfloat")) {
val query1 = sql(s"SELECT * FROM testfiltersource WHERE 150 > $myval AND 100 <= $myval")
assert(
query1.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
"Top operator should be ParquetTableScan after pushdown")
val result1 = query1.collect()
assert(result1.size === 50)
assert(result1(0)(1) === 100)
assert(result1(49)(1) === 149)
val query2 = sql(s"SELECT * FROM testfiltersource WHERE 150 < $myval AND 200 >= $myval")
assert(
query2.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
"Top operator should be ParquetTableScan after pushdown")
val result2 = query2.collect()
assert(result2.size === 50)
if (myval == "myint" || myval == "mylong") {
assert(result2(0)(1) === 151)
assert(result2(49)(1) === 200)
} else {
assert(result2(0)(1) === 150)
assert(result2(49)(1) === 199)
}
}
for(myval <- Seq("myint", "mylong")) {
val query3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190 OR $myval < 10")
assert(
Expand Down

0 comments on commit bd17512

Please sign in to comment.