[SPARK-51016][SQL] Fix for incorrect results on retry for Left Outer Join with indeterministic join keys #49708
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
The main difference between existing deterministic boolean and this is , that the former determines if same expression is evaluated more than once in a given iteration would its value change or not. Also on that basis, any leaf expression would always be deterministic.
But the latter indicates if an expression's evaluation used/uses any non deterministic component. So here even the leaf expression ( like Attribute) can have non deterministic character flag as true, if its pointing to some result obtained via indeterministic calculation.
Apart from that code of ShuffleDependency is augmented to indicate if the hashing expression is using any non deterministic component.
Because ReadWriteLocks are not upgrdable ( from read to write), and in a particular case of successful task completion , the thread owning the Read Lock in Stage, invokes the code of making a new Stage attempt, seeking Write Lock, a ThreadLocal is introduced in the Stage class, which keeps track of Threads which have already taken a Read Lock, so that they do not attempt to acquire Write Lock
Why are the changes needed?
There are 3 issues which this PR is addressing:
The method Stage.isIndeterminate is not returning true in case the ShuffleDependency is using Partitioner based on an inDeterministic Attribute. The bug is at Stage and RDD level. An expression which is using an Attribute derived from an inDeterministic Expression, looses the information, that its Output is using an InDeterministic component.
Once the above issue is fixed, that exposes a race condition, where a successful task completion concurrent with a task failure , for an inDeterminate stage, results in a situation , where instead of re-executing all partitions, only some are retried. This results in data loss.
The race condition identified is as follows:
a) A successful result stage task, is yet to mark in the boolean array tracking partitions success/failure as true/false.
b) A concurrent failed result task, belonging to an InDeterminate stage, idenitfies all the stages which needs/ can be rolled back. For Result Stage, it looks into the array of successful partitions. As none is marked as true, the ResultStage and dependent stages are delegated to thread pool for retry.
c) Between the time of collecting stages to rollback and re-try of stages, the successful task marks boolean as true.
d) The Retry of Stage, as a result, misses the partition marked as successful, for retry.
An existing test, has incorrect assertions regarding the number of partitions being retried , for an inDeterminate stage.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit tests / Bug test to validate the behaviour of InDeterministic Stages
Have a functional test which exposes following:
I am attaching 2 files for bug test
This is needed to coax the single VM test to reproduce the issue. It has lots of interception and tweaks to ensure that system is able to hit the data loss situation.
( like each partition writes only a shuffle file containing keys evaluating to same hashCode and deleting the shuffle file at right time etc)
a) If the bugrepro.patch is applied to current master and the BugTest run, it will fail immediately with assertion failure where instead of 12 rows, 6 rows show up in result.
b) If the bugrepro.patch is applied to this bug fix branch , but commit taken till 790aaf0, which only contains the fix for Stage.isInDeterminate , then the BugTest will fail after one or two or more iterations, indicating the race condition in DataScheduler/Stage interaction.
c) But if the same BugTest is run on latest commit, it will pass in all the 100 iteration.
bugrepro.patch
BugTest.txt
Was this patch authored or co-authored using generative AI tooling?
No