Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51016][SQL] Fix for incorrect results on retry for Left Outer Join with indeterministic join keys #50029

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

ahshahid
Copy link

@ahshahid ahshahid commented Feb 20, 2025

What changes were proposed in this pull request?

Added a new boolean hasIndeterminism in the Expression class, to indicate whether the expression's evaluation result was obtained using some indeterministic calculation.

The main difference between existing deterministic boolean and hasIndeterminism is , that the former determines that if same expression is evaluated more than once in a given iteration would its value change or not. Also on that basis, AttributeReference would always be deterministic.

But the latter indicates if an expression's evaluation used/uses any non deterministic component. So here even the leaf expression ( like Attribute) can have hasIndeterminism flag as true, if its pointing to some result obtained via indeterministic calculation.

To further elaborate, consider following Alias expression:

// The boolean hasInDeterminism carries the information
// that the value AttributeReference represents, has an inDeterministic
// Component to it.
val aliasExpr = new Alias (random() * Literal(3), "X")
assert aliasExpr.deterministic == false
assert aliasExpr.hasInDeterminism == true

val attribRef  = aliasExpr.toAttribute
assert attribRef.deterministic == true
assert attribRef.hasInDeterminism == true

Apart from that code of ShuffleDependency is augmented to indicate if the hashing expression is using any non deterministic component.

Why are the changes needed?

The method Stage.isIndeterminate is not returning true in case the ShuffleDependency is using Partitioner based on an inDeterministic evaluator. The bug is at Stage and RDD level.
SQL expression which is using an Attribute derived from an inDeterministic Expression, looses the information, that its Output is using an InDeterministic component or equivalently put, the Partition Evaluator function given to Partitioner is of inDeterministic nature.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit tests / Bug test to validate the behaviour of InDeterministic Stages

Was this patch authored or co-authored using generative AI tooling?

No

@mridulm
Copy link
Contributor

mridulm commented Feb 22, 2025

It is unclear to me why the changes to spark core are required - marking the RDD with the appropriate DeterministicLevel should be sufficient.

@ahshahid
Copy link
Author

It is unclear to me why the changes to spark core are required - marking the RDD with the appropriate DeterministicLevel should be sufficient.
In case of ShuffleMap stage, the base RDD itself might be deterministic but the Partitioner may be not.
If any of the changes present are removed, tests will fail.

ashahid added 4 commits February 26, 2025 17:55
…minism via curried constructor of AttributeReference, as per feedback. Renamed the boolean
…minism via curried constructor of AttributeReference, as per feedback. Renamed the boolean
@ahshahid
Copy link
Author

ahshahid commented Mar 1, 2025

@mridulm @squito ,
I am unsure as to what you mean by marking the RDD inDeterministic, without modifying the RDD code....

  1. There is no concrete field in the RDD which marks it inDeterministic ( The root RDD is always considered inDeterministic)
  2. Based on existing code, there is a function in RDD and overridden in MapPartitionsRDD which identifies whether RDD is deterministic or not. And that code relies on Dependency and the RDD contained in dependencies.
  3. It does not take into account anywhere, the indeterministic nature of PartitionEvaluator of the RDD.

Consider the test "SPARK-51016: ShuffleMapStage using indeterministic join keys should be INDETERMINATE", in newly added file ShuffleMapStageTest
In this case, the ShuffleMapStage contains ShuffleDependency and the corresponding RDD (MapPartitionsRDD). And MapPartitionsRDD 's depedencies is ParallelCollectionRDD.
so in the above interaction, the knowledge that the partition evaluator is inDeterministic, is embedded in the Lambda passed to MapPartitionsRDD or is present in ShuffleDepedency code ( which is modified to store that data).

For RDD to me marked as inDeterministic, what else do you have in mind ? Partitioner interface augmentation ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants