-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make Literals foldable, ensure Parquet predicates pushdown #721
Conversation
Codecov Report
@@ Coverage Diff @@
## master #721 +/- ##
==========================================
+ Coverage 95.52% 95.56% +0.03%
==========================================
Files 67 67
Lines 1184 1172 -12
Branches 39 41 +2
==========================================
- Hits 1131 1120 -11
+ Misses 53 52 -1
Flags with carried forward coverage won't be shown. Click here to find out more.
|
nb the patch check is due to previously untested code being seen as new code because of the "," added :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for an awesome PR, I added a couple of comments, it is all tests cases related. 🎉
Usually those optimiation rules are injected via config, I think it also makes sense to add an injector, so users may specify it in the Spark config (i.e. in the DataBricks cluster config) ~
config.set("spark.sql.extensions", classOf[FramelessPushdownOptimizations].getName)
And we need docs!
I can help you with some of those aspects if needed, and I def don't want to step on your toes.
re Databricks cluster config, it requires an uber + probably shaded jar, effectively making frameless part of databricks. I do exactly this in Quality which isn't so straight forward. Frameless could offer a shaded/uber jar for this purpose but I think for the number of users it's better off being left in their hands and document that they have provided build dependencies in that case. The experimental (for like 4 years now I think) optimiser route is definitely easier to integrate. If more optimisations get added it could be worthwhile. I did think of one whilst doing tests - struct push downs, you'd have to unpack each struct field for an entire tree to do comparisons, maps wouldn't work either. That's definitely fun code as well. |
So whilst most types will work with the experimental approach, structs don't, and I assume others will also not, the experimental rules don't occur early enough - a painful lesson learnt on Quality. (added verification tests for this but the reason is the Literal swap happens after the cast from X1[X4.. to struct would occur and the cast simplification rules aren't yet hit). So docs wise would you prefer a ymmv caveat on the use of experimental and more time on the extension as the preferred route? Similarly are you ok with linking to the Quality docs on how to register on Databricks or would you prefer a c+p? |
I think to document our optimization rules, and usage examples -- via config & injector, and by manually appending rules into the context. |
I think it should work, smth else is off. If you replace case class Inner(a: Int, b: Int, c: Int, d: Int)
case class Test(a: Inner) The output will be: == Parsed Logical Plan ==
Filter (a#9.a > 0)
+- Relation [a#9] parquet
== Analyzed Logical Plan ==
a: struct<a:int,b:int,c:int,d:int>
Filter (a#9.a > 0)
+- Relation [a#9] parquet
== Optimized Logical Plan ==
Filter (isnotnull(a#9.a) AND (a#9.a > 0))
+- Relation [a#9] parquet
== Physical Plan ==
*(1) Filter (isnotnull(a#9.a) AND (a#9.a > 0))
+- *(1) ColumnarToRow
+- FileScan parquet [a#9] Batched: true, DataFilters: [isnotnull(a#9.a), (a#9.a > 0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/.../subversions/git/github/.../framele..., PartitionFilters: [], PushedFilters: [IsNotNull(a.a), GreaterThan(a.a,0)], ReadSchema: struct<a:struct<a:int,b:int,c:int,d:int>> |
That's not the same test though, that's a single nested field being compared. |
@chris-twiner oh, yea that's the difference! I don't believe pushdown works for non-primitives. |
perhaps not generally but they are possible to push down: the test The source.EqualTo is there when using the extension - not with experimental. |
@chris-twiner yes, EqualTo pushdown is supported, but not for all its arguments types in case of a Parquet format. So if there is some other format that supports non primitives pushdown it can be pushed. In the optimize plan step optimizer applies custom rules, and if those expanded / rewritten predicates are supported for the push down -- they will be pushed. TBH (that's about naming), I think those two features (manual rules appending and injection via injector) are considered to be experimental since Spark 2.x (don't know a specific version) :D But I could be wrong^ |
it's because the values aren't foldable: override val foldable: Boolean = true//catalystExpr.foldable In 3.3 InvokeLike introduced: override def foldable: Boolean =
children.forall(_.foldable) && deterministic && trustedSerializable(dataType) so it's missing from 3.2 . That stops timestamp and instant.
isnull is foldable, null is foldable but named_struct is not because of the invokes. So the rule itself isn't needed, it's equivalent to foldable = true |
@chris-twiner yea, let's make it just true in this case 👍 good to know. Lets add a link to your comments into the code comments so we don't forget the decision made! |
done, I guess github has had enough of this for a bit though, the CI isn't running. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔥
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔥
(upd: idk why there are two reviews)
Per the text in SPARK-40380 this is probably not correct as it'd affect sparksql-scalapb a user of frameless. Rather than break out another source compatibility layer I'll look at a compile time elide or similar and sub in a different solution for 3.2. Question is should the solution just be accept failure on 3.2 (and document predicate pushdown won't work on 3.2) or actually provide a backport of 40380? |
@chris-twiner could you tell more, how that issue is related to this PR, and to sparksql-scalapb? |
sorry, was trying to create an example. With foldable always true Invokes could create unserializable expressions (that are ObjectType results). These would be fine when not folded, but folding will cause the query to fail as the expression in Literal would be non-serializable. 3.3.1 and above "fixes" that by this code in InvokeLike: override def foldable: Boolean =
children.forall(_.foldable) && deterministic && trustedSerializable(dataType)
// Returns true if we can trust all values of the given DataType can be serialized.
private def trustedSerializable(dt: DataType): Boolean = {
// Right now we conservatively block all ObjectType (Java objects) regardless of
// serializability, because the type-level info with java.io.Serializable and
// java.io.Externalizable marker interfaces are not strong guarantees.
// This restriction can be relaxed in the future to expose more optimizations.
!dt.existsRecursively(_.isInstanceOf[ObjectType])
} so by not calling catalystExpr.foldable we risk introducing an unserializable ObjectType and stopping the query from running at all. The example code in that jira will "work" in 3.2.4 but fail with a serialization error in more recent Spark versions. It's that exact failure that is stopped in folding by the above code snippet. |
@chris-twiner we could backport this function into the // https://github.com/typelevel/frameless/pull/721
// TODO: remove with the Spark 3.2 support drop
def isFoldableExpressionCompat(expr: Expression): Boolean = {
// Returns true if we can trust all values of the given DataType can be serialized.
def trustedSerializable(dt: DataType): Boolean = {
// Right now we conservatively block all ObjectType (Java objects) regardless of
// serializability, because the type-level info with java.io.Serializable and
// java.io.Externalizable marker interfaces are not strong guarantees.
// This restriction can be relaxed in the future to expose more optimizations.
!dt.existsRecursively(_.isInstanceOf[ObjectType])
}
expr.children.forall(_.foldable) && expr.deterministic && trustedSerializable(expr.dataType)
} and then on the user side it is smth like // in Lit.scala
// https://github.com/typelevel/frameless/pull/721
// TODO: replace with catalystExpr.foldable with the Spark 3.2 drop
override val foldable: Boolean = FramelessInternals.isFoldableExpressionCompat(catalystExpr) |
That will fail on 3.2 though. It would need to be a bottom up check with that logic called for InvokeLike. |
Oh you're right. |
Let's still add this backport for the 3.2/3.3+ compat (we need that foldable check), but we'll leave a note about this issue, imo that's the Spark problem. |
…oper foldable test
agreed, I've updated the comment on that and disabled the tests for 3.2. |
…06 and SPARK-40380
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Left a couple of suggestions around the dir names!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔥 Quite a PR
indeed, but a fun journey |
The rule actually works at this plan level, so no extension required. At time of writing 3.x should support the getPushDowns approach, 3.5 snaps currently do at least.
I've also added a dependency on naked fs, to allow windows dev without winutils. The StreamingFS class should go away with #5 on naked fs
Closes #343