-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20331][SQL] Enhanced Hive partition pruning predicate pushdown #17633
[SPARK-20331][SQL] Enhanced Hive partition pruning predicate pushdown #17633
Conversation
Test build #75783 has finished for PR 17633 at commit
|
Does this work for non-Hive tables? |
This is geared towards Hive partitioned tables. If we have another system that prunes table partitions based on a string-ified pruning predicate I'm unaware. Do you have one in mind? |
Then it should work. |
s"""${quoteStringLiteral(v.toString)} ${op.symbol} ${a.name}""" | ||
}.mkString(" and ") | ||
def isFoldable(expr: Expression): Boolean = | ||
(expr.dataType.isInstanceOf[IntegralType] || expr.dataType.isInstanceOf[StringType]) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this support all AtomicType
's ? From my understanding these are partition columns and can support other types besides int and string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IntegralType
encompasses all "integral" types, including IntegerType
, ByteType
, ShortType
, etc.
I'm trying to be somewhat conservative in what we support here to ensure compatibility. Is there a particular type you'd like to see supported?
@cloud-fan @ericl Hi guys. Care to review? |
s"(${convert(expr1)} or ${convert(expr2)})" | ||
} | ||
|
||
filters.flatMap(f => Try(convert(f)).toOption).mkString(" and ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a Try
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The convert
method throws a MatchError
if an expression is not a compatible Hive partition filter. Otherwise it returns a compatible partition pruning string. While using exception handling in this way can certainly be considered an anti-pattern, it does make for a much simpler implementation and type signature of the convert
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we just make convert
a PartialFunction
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable. Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we can get rid of Try
now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for partial function, now we can do filters.flatMap(f => convert.lift(f))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or filters.collect(convert)
. I've not tried, but should work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah that's better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both filters.flatMap(f => convert.lift(f))
and filters.collect(convert)
throw a MatchError
on input
ds=(20170101 + 1) and h=0
Now I know that in practice foldable expressions such as 20170101 + 1
are converted to literals, but the point I'm making here is that the alternatives proposed here will throw a MatchError
if the filter expression is parsed into a tree with a leaf that does not match a pattern defined by the convert
partial function.
Hence the use of filters.flatMap(f => Try(convert(f)).toOption).mkString(" and ")
.
expr.foldable && | ||
expr.deterministic | ||
|
||
def convertFoldable(expr: Expression): String = expr.dataType match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The foldable expressions should be converted in Optimizer, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, to make the code simpler, we can just match literals.
def convert(filter: Expression): String = | ||
filter match { | ||
case In(a: Attribute, exprs) if exprs.forall(isFoldable) => | ||
val or = exprs.map(expr => s"${a.name} = ${convertFoldable(expr)}").reduce(_ + " or " + _) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to quote the column names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. You mean with backticks, like
s"`${a.name}` = ${convertFoldable(expr)}"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see AttributeReference.sql
, we should use quoteIdentifier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In testing this in a modified form of HiveClientSuite
, Hive is complaining that it can't parse the predicate. Specifically, the error message in the exception it's throwing is
Error parsing partition filter : line 1:3 no viable alternative at character '`'
The filter string it's trying to parse is `ds` = 20170101
.
Could you check whether there exists any limit on predicate we can pass to Hive? Are they the same for all our supported Hive megastore versions? |
|
||
assert(filteredPartitions.size == testPartitionCount) | ||
} | ||
|
||
test("getPartitionsByFilter: ds=20170101") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will these tests be executed on all supported hive versions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will these tests be executed on all supported hive versions?
No. That's something I'll look into.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They will now. See HiveClientSuites.scala
in this PR.
Hi Guys. Sorry for the lack of updates on this. I've been held up with other responsibilities the past week. I'm planning to push a new commit today or tomorrow. |
1a7663d
to
a4cdfb0
Compare
I've pushed a new commit removing the logical for handling "foldables", since these are evaluated earlier in planning. I've also removed the modifications I made to |
There are, and I found something in the way of documentation or a grammar a while back that specifies the accepted predicates. I'll dig that up.
I don't know yet. I have cross-version compatibility checking on my to-do list. |
Test build #76706 has finished for PR 17633 at commit
|
}.mkString(" and ") | ||
def isExtractable(expr: Expression): Boolean = | ||
expr match { | ||
case Literal(_, _: IntegralType) | Literal(_, _: StringType) => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about float/double/decimal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to look into support for that. FWIW, it's not supported in the current codebase, so omitting it wouldn't be a regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we omit support for this at this time to reduce the necessary testing and reviewing footprint. We can add support for other data types as part of a new Jira issue and PR. Okay?
Hey guys. Just a quick update. I made good progress on implementing multi-version testing today, however it's not quite ready. I'm going to be on leave from tomorrow through the rest of next week, so I'm kind of doubtful I'll push anything new until May 22nd. |
You can add the test cases to |
a4cdfb0
to
4f802a5
Compare
Test build #77199 has started for PR 17633 at commit |
@gatorsmile, I've refactored Hive version-specific testing to make it easier to add new Hive test suites which test functionality for a collection of Hive versions. I did not feel adding these partition filtering tests to |
|
||
// Should this be in a beforeAll() method instead? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be in a beforeAll()
method instead? This actually initializes client
for use by other tests. Putting that in a beforeAll()
method feels more appropriate.
I think this build was aborted because of the emergency jenkins restart, as reported on the spark dev mailing list. Retest, please? |
retest this please |
Test build #77242 has finished for PR 17633 at commit
|
4f802a5
to
0c4040c
Compare
Rebased to resolve merge conflicts. |
Test build #77262 has finished for PR 17633 at commit
|
|
||
def convert(filter: Expression): String = | ||
filter match { | ||
case In(a: Attribute, exprs) if exprs.forall(isExtractable) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we check varcharKeys
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Will do.
function and add a guard when testing In expressions against varchar attributes
add another test case
d88b8ab
to
a087a0f
Compare
|
||
object ExtractableLiterals { | ||
def unapply(exprs: Seq[Expression]): Option[Seq[String]] = { | ||
exprs.map(ExtractableLiteral.unapply).foldLeft(Option(Seq.empty[String])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like it to be more java style:
val extracted = exprs.map(ExtractableLiteral.unapply)
if (extracted. exists(_.isEmpty)) {
None
} else {
extracted.map(_.get)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there something wrong with the way it is now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
foldLeft
may be not friendly to some Spark developers, but it's not a big deal.
} | ||
|
||
def unapply(values: Set[Any]): Option[Seq[String]] = { | ||
values.toSeq.foldLeft(Option(Seq.empty[String])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
(convert.lift(expr1) ++ convert.lift(expr2)).mkString("(", " and ", ")") | ||
case op @ Or(expr1, expr2) | ||
if convert.isDefinedAt(expr1) && convert.isDefinedAt(expr2) => | ||
(convert.lift(expr1) ++ convert.lift(expr2)).mkString("(", " or ", ")") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "(" + convert(expr1) + " or " + convert(expr) + ")"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
if !varcharKeys.contains(a.name) => | ||
s"""${a.name} ${op.symbol} ${quoteStringLiteral(v.toString)}""" | ||
case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute) | ||
s"${a.name} ${op.symbol} $value" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we add ()
for binary comparisons?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a problem with leaving them out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, realized that and
, or
have lower precedence over binary operators, so it should be fine
Test build #79493 has finished for PR 17633 at commit
|
to a filter string, we can safely call convert(expr) on each directly
LGTM, pending jenkins |
Test build #79501 has finished for PR 17633 at commit
|
thanks, merging to master! |
@cloud-fan Can you back port this PR to 2.1 and 2.2, please? I think the patch should apply cleanly. |
@mallman we don't backport such risky changes to maintenance branches. Those branches typically go through much less testing. |
…on pruning predicate pushdown ## What changes were proposed in this pull request? This is a follow-up PR of #17633. This PR is to add a conf `spark.sql.hive.advancedPartitionPredicatePushdown.enabled`, which can be used to turn the enhancement off. ## How was this patch tested? Add a test case Author: gatorsmile <gatorsmile@gmail.com> Closes #19547 from gatorsmile/Spark20331FollowUp.
hi @mallman But when I try to run this test on Intellij IDEA, I got the following error message, do you have any idea to run this test on Intellij?
|
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-20331) Spark 2.1 introduced scalable support for Hive tables with huge numbers of partitions. Key to leveraging this support is the ability to prune unnecessary table partitions to answer queries. Spark supports a subset of the class of partition pruning predicates that the Hive metastore supports. If a user writes a query with a partition pruning predicate that is *not* supported by Spark, Spark falls back to loading all partitions and pruning client-side. We want to broaden Spark's current partition pruning predicate pushdown capabilities. One of the key missing capabilities is support for disjunctions. For example, for a table partitioned by date, writing a query with a predicate like date = 20161011 or date = 2016101 will result in Spark fetching all partitions. For a table partitioned by date and hour, querying a range of hours across dates can be quite difficult to accomplish without fetching all partition metadata. The current partition pruning support supports only comparisons against literals. We can expand that to foldable expressions by evaluating them at planning time. We can also implement support for the "IN" comparison by expanding it to a sequence of "OR"s. The `HiveClientSuite` and `VersionsSuite` were refactored and simplified to make Hive client-based, version-specific testing more modular and conceptually simpler. There are now two Hive test suites: `HiveClientSuite` and `HivePartitionFilteringSuite`. These test suites have a single-argument constructor taking a `version` parameter. As such, these test suites cannot be run by themselves. Instead, they have been bundled into "aggregation" test suites which run each suite for each Hive client version. These aggregation suites are called `HiveClientSuites` and `HivePartitionFilteringSuites`. The `VersionsSuite` and `HiveClientSuite` have been refactored into each of these aggregation suites, respectively. `HiveClientSuite` and `HivePartitionFilteringSuite` subclass a new abstract class, `HiveVersionSuite`. `HiveVersionSuite` collects functionality related to testing a single Hive version and overrides relevant test suite methods to display version-specific information. A new trait, `HiveClientVersions`, has been added with a sequence of Hive test versions. Author: Michael Allman <michael@videoamp.com> Closes apache#17633 from mallman/spark-20331-enhanced_partition_pruning_pushdown. (cherry picked from commit a4baa8f) Conflicts: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
…on pruning predicate pushdown ## What changes were proposed in this pull request? This is a follow-up PR of apache#17633. This PR is to add a conf `spark.sql.hive.advancedPartitionPredicatePushdown.enabled`, which can be used to turn the enhancement off. ## How was this patch tested? Add a test case Author: gatorsmile <gatorsmile@gmail.com> Closes apache#19547 from gatorsmile/Spark20331FollowUp. (cherry picked from commit d8cada8)
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-20331) Spark 2.1 introduced scalable support for Hive tables with huge numbers of partitions. Key to leveraging this support is the ability to prune unnecessary table partitions to answer queries. Spark supports a subset of the class of partition pruning predicates that the Hive metastore supports. If a user writes a query with a partition pruning predicate that is *not* supported by Spark, Spark falls back to loading all partitions and pruning client-side. We want to broaden Spark's current partition pruning predicate pushdown capabilities. One of the key missing capabilities is support for disjunctions. For example, for a table partitioned by date, writing a query with a predicate like date = 20161011 or date = 2016101 will result in Spark fetching all partitions. For a table partitioned by date and hour, querying a range of hours across dates can be quite difficult to accomplish without fetching all partition metadata. The current partition pruning support supports only comparisons against literals. We can expand that to foldable expressions by evaluating them at planning time. We can also implement support for the "IN" comparison by expanding it to a sequence of "OR"s. The `HiveClientSuite` and `VersionsSuite` were refactored and simplified to make Hive client-based, version-specific testing more modular and conceptually simpler. There are now two Hive test suites: `HiveClientSuite` and `HivePartitionFilteringSuite`. These test suites have a single-argument constructor taking a `version` parameter. As such, these test suites cannot be run by themselves. Instead, they have been bundled into "aggregation" test suites which run each suite for each Hive client version. These aggregation suites are called `HiveClientSuites` and `HivePartitionFilteringSuites`. The `VersionsSuite` and `HiveClientSuite` have been refactored into each of these aggregation suites, respectively. `HiveClientSuite` and `HivePartitionFilteringSuite` subclass a new abstract class, `HiveVersionSuite`. `HiveVersionSuite` collects functionality related to testing a single Hive version and overrides relevant test suite methods to display version-specific information. A new trait, `HiveClientVersions`, has been added with a sequence of Hive test versions. Author: Michael Allman <michael@videoamp.com> Closes apache#17633 from mallman/spark-20331-enhanced_partition_pruning_pushdown. (cherry picked from commit a4baa8f) Conflicts: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
…on pruning predicate pushdown This is a follow-up PR of apache#17633. This PR is to add a conf `spark.sql.hive.advancedPartitionPredicatePushdown.enabled`, which can be used to turn the enhancement off. Add a test case Author: gatorsmile <gatorsmile@gmail.com> Closes apache#19547 from gatorsmile/Spark20331FollowUp. (cherry picked from commit d8cada8) Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
…on pruning predicate pushdown ## What changes were proposed in this pull request? This is a follow-up PR of apache#17633. This PR is to add a conf `spark.sql.hive.advancedPartitionPredicatePushdown.enabled`, which can be used to turn the enhancement off. ## How was this patch tested? Add a test case Author: gatorsmile <gatorsmile@gmail.com> Closes apache#19547 from gatorsmile/Spark20331FollowUp.
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-20331)
What changes were proposed in this pull request?
Spark 2.1 introduced scalable support for Hive tables with huge numbers of partitions. Key to leveraging this support is the ability to prune unnecessary table partitions to answer queries. Spark supports a subset of the class of partition pruning predicates that the Hive metastore supports. If a user writes a query with a partition pruning predicate that is not supported by Spark, Spark falls back to loading all partitions and pruning client-side. We want to broaden Spark's current partition pruning predicate pushdown capabilities.
One of the key missing capabilities is support for disjunctions. For example, for a table partitioned by date, writing a query with a predicate like
will result in Spark fetching all partitions. For a table partitioned by date and hour, querying a range of hours across dates can be quite difficult to accomplish without fetching all partition metadata.
The current partition pruning support supports only comparisons against literals. We can expand that to foldable expressions by evaluating them at planning time.
We can also implement support for the "IN" comparison by expanding it to a sequence of "OR"s.
How was this patch tested?
The
HiveClientSuite
andVersionsSuite
were refactored and simplified to make Hive client-based, version-specific testing more modular and conceptually simpler. There are now two Hive test suites:HiveClientSuite
andHivePartitionFilteringSuite
. These test suites have a single-argument constructor taking aversion
parameter. As such, these test suites cannot be run by themselves. Instead, they have been bundled into "aggregation" test suites which run each suite for each Hive client version. These aggregation suites are calledHiveClientSuites
andHivePartitionFilteringSuites
. TheVersionsSuite
andHiveClientSuite
have been refactored into each of these aggregation suites, respectively.HiveClientSuite
andHivePartitionFilteringSuite
subclass a new abstract class,HiveVersionSuite
.HiveVersionSuite
collects functionality related to testing a single Hive version and overrides relevant test suite methods to display version-specific information.A new trait,
HiveClientVersions
, has been added with a sequence of Hive test versions.