-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49960][SQL] Custom ExpressionEncoder support and TransformingEncoder fixes #50023
[SPARK-49960][SQL] Custom ExpressionEncoder support and TransformingEncoder fixes #50023
Conversation
…r fixes - add deprecation note
replaces #48477 with TransformingEncoder fixes. This allows all of Frameless tests to pass when used either with the backwards compat AgnosticExpressionPathEncoder root and all tests to work with the frameless AgnosticEncoder based encoder derivation branch. One ExpressionEncoderSuite test "transforming encoders as value class - Frameless value class as parameter use case" does not work when using a TransformingEncoder over the string field. I'll raise another issue should this be a valid use case and an actual bug. |
@hvanhovell - per our convo |
…ansformingEncoder anyway
sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
Outdated
Show resolved
Hide resolved
@@ -228,7 +236,8 @@ case class ExpressionEncoder[T]( | |||
* returns true if `T` is serialized as struct and is not `Option` type. | |||
*/ | |||
def isSerializedAsStructForTopLevel: Boolean = { | |||
isSerializedAsStruct && !classOf[Option[_]].isAssignableFrom(clsTag.runtimeClass) | |||
isSerializedAsStruct && !classOf[Option[_]].isAssignableFrom(clsTag.runtimeClass) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isSerializedAsStruct && !transformerOfOption(encoder)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we should make these checks part of the AgnosticEncoder api.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd make sense, for the path encoder backwards compat logic I can embed / document that in shim. The Builders could embed that. I can take a stab at that post rc2.
@@ -142,6 +142,19 @@ case class OptionNestedGeneric[T](list: Option[T]) | |||
case class MapNestedGenericKey[T](list: Map[T, Int]) | |||
case class MapNestedGenericValue[T](list: Map[Int, T]) | |||
|
|||
// ADT encoding for TransformingEncoder test | |||
trait Base { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this case I really want to add some sort of a UnionEncoder. That either nests the individual implementations, or flattens them (+ a discriminator field).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would save an intermediary representation, the problem is the scope of the field, locking it down to one field doesn't seem right but if the field could be optionally dropped then it could be a generated Column by the users. I'll take a stab at it post rc2
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Outdated
Show resolved
Hide resolved
assert(ds.collect().toVector === data.toVector) | ||
} | ||
|
||
test("""Encoder derivation with TransformingEncoder of OptionEncoder""".stripMargin) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this illustrating a different issue than above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really, it's only testing the recursion works in the ExpressionEncoder() detection. happy to remove
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall.
I am fine with merging it as is. Or a I can wait a bit so you can address the comments. I would like to get this in by RC2 (this will be cut end of this week). Please let me know what works for you.
…te, the types are proved by round tripping anyway
fyi as you are coming on line now - I've got every point of the feedback implemented (excluding the two post rc2 notes I mentioned above - nullable top level field and union). Test fails are long running, no issue and scalafmt works locally at least, I'm retrying the fails... |
…ses (Spark or frameless)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@chris-twiner can you fix the style issue? |
@hvanhovell - yeah done |
Merging to master/4.0. Thanks! |
…ncoder fixes ### What changes were proposed in this pull request? 4.0.0-preview2 introduced, as part of SPARK-49025 pr #47785, changes which drive ExpressionEncoder derivation purely from AgnosticEncoders. This PR adds a trait: ```scala DeveloperApi trait AgnosticExpressionPathEncoder[T] extends AgnosticEncoder[T] { def toCatalyst(input: Expression): Expression def fromCatalyst(inputPath: Expression): Expression } ``` and hooks in the De/SerializationBuildHelper matches to allow seamless extension of non-connect custom encoders (such as [frameless](https://github.com/typelevel/frameless) or [sparksql-scalapb](https://github.com/scalapb/sparksql-scalapb)). SPARK-49960 provides the same information. Additionally this PR provides fixes necessary to use TransformingEncoder as a root encoder with an OptionalEncoder, use as an ArrayType and MapType entry/key. ### Why are the changes needed? Without this change (or similar) there is no way for custom encoders to integrate with 4.0.0-preview2 derived encoders, something which has worked and devs have benefited from since pre 2.4 days. This stops code such as Dataset.joinWith from deriving a tuple encoder which works (as the provided ExpressionEncoder is now discarded under preview2). Supplying a custom AgnosticEncoder under preview2 also fails as only the preview2 AgnosticEncoders are supported in De/SerializationBuildHelper, triggering a MatchError. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test was added using a "custom" string encoder and joinWith based on an existing joinWith test. Removing the case statements in either BuildHelper will trigger the MatchError. ### Was this patch authored or co-authored using generative AI tooling? No Closes #50023 from chris-twiner/temp/expressionEncoder_compat_TransformingEncoder_fixes. Authored-by: Chris Twiner <chris.twiner@gmail.com> Signed-off-by: Herman van Hovell <herman@databricks.com> (cherry picked from commit 50a328b) Signed-off-by: Herman van Hovell <herman@databricks.com>
What changes were proposed in this pull request?
4.0.0-preview2 introduced, as part of SPARK-49025 pr #47785, changes which drive ExpressionEncoder derivation purely from AgnosticEncoders. This PR adds a trait:
and hooks in the De/SerializationBuildHelper matches to allow seamless extension of non-connect custom encoders (such as frameless or sparksql-scalapb).
SPARK-49960 provides the same information.
Additionally this PR provides fixes necessary to use TransformingEncoder as a root encoder with an OptionalEncoder, use as an ArrayType and MapType entry/key.
Why are the changes needed?
Without this change (or similar) there is no way for custom encoders to integrate with 4.0.0-preview2 derived encoders, something which has worked and devs have benefited from since pre 2.4 days. This stops code such as Dataset.joinWith from deriving a tuple encoder which works (as the provided ExpressionEncoder is now discarded under preview2). Supplying a custom AgnosticEncoder under preview2 also fails as only the preview2 AgnosticEncoders are supported in De/SerializationBuildHelper, triggering a MatchError.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Test was added using a "custom" string encoder and joinWith based on an existing joinWith test. Removing the case statements in either BuildHelper will trigger the MatchError.
Was this patch authored or co-authored using generative AI tooling?
No