Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23207][SQL] Shuffle+Repartition on a DataFrame could lead to incorrect answers #20393

Closed
wants to merge 2 commits into from

Conversation

jiangxb1987
Copy link
Contributor

What changes were proposed in this pull request?

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:

import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named spark.sql.execution.sortBeforeRepartition to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

How was this patch tested?

Add unit test in ExchangeSuite.

With this patch(and spark.sql.execution.sortBeforeRepartition set to true), the following query returns 1000000:

import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000

@SparkQA
Copy link

SparkQA commented Jan 25, 2018

Test build #86635 has finished for PR 20393 at commit 7fd964e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class RecordBinaryComparator extends RecordComparator

@viirya
Copy link
Member

viirya commented Jan 25, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 25, 2018

Test build #86639 has finished for PR 20393 at commit 7fd964e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class RecordBinaryComparator extends RecordComparator

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@shivaram
Copy link
Contributor

@jiangxb1987 If I'm not wrong this problem will also happen with RDD repartition ? Will this fix also cover that ?

@sameeragarwal
Copy link
Member

Yes, this bug also applies to RDD repartition but the current fix doesn't cover this (the local sort approach would be quite similar but it'll be a completely different codepath).

@jiangxb1987 - to @shivaram 's point, it'd be great to add a TODO for later.

@sameeragarwal
Copy link
Member

LGTM, thanks!

@shivaram
Copy link
Contributor

@sameeragarwal I think we should wait for the RDD fix for 2.3 as well ?

@jiangxb1987
Copy link
Contributor Author

I added TODO on this, so we may have this for now and I'll continue working on the RDD path.

@SparkQA
Copy link

SparkQA commented Jan 26, 2018

Test build #86668 has finished for PR 20393 at commit 400a766.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 26, 2018

Test build #86682 has finished for PR 20393 at commit 400a766.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor Author

Actually the similar approach cannot apply to fix RDD.repartition(), as in RDD[T], the data type T can be non-comparable, so we are not able to perform a local sort before actually repartition.

I'm stepping back investigating other approaches that requires some refactoring on the Core module but I don’t think that it is safe to ship the approach together with Spark 2.3

So my propose is, let’s include this PR in Spark 2.3, and target the follow up work to 2.4. Especially since the RDD.repartition() issue is not a regression of the latest version.

WDYT? @shivaram @sameeragarwal @rxin @mridulm

@jiangxb1987
Copy link
Contributor Author

Another simple way to ensure correctness of RDD.repartition() is to do HashPartitioning instead of current RoundRobinPartitioning, but that will lead to regression when you have skew input data.

@mridulm
Copy link
Contributor

mridulm commented Jan 26, 2018

@jiangxb1987 Other than hash partitioning, I dont see how this can be handled reliably ...
You are right, this is a basic correctness issue - unfortunately I never used this family of methods (coalasce, repartition, etc) and never saw the issue.

@shivaram Any thoughts ? You might have better insights.

@mridulm
Copy link
Contributor

mridulm commented Jan 26, 2018

@jiangxb1987 Btw, we could argue this is a correctness issue since we added repartition - so not necessarily blocker :-)

@jiangxb1987 jiangxb1987 changed the title [SPARK-23207][SQL] Shuffle+Repartition on an RDD/DataFrame could lead to Data Loss [SPARK-23207][SQL] Shuffle+Repartition on a DataFrame could lead to Data Loss Jan 26, 2018
@sameeragarwal
Copy link
Member

sameeragarwal commented Jan 26, 2018

Another (possibly cleaner) approach here would be to make the shuffle block fetch order deterministic but I agree that it might not be safe to include it in 2.3 this late.

edit: @jiangxb1987 how about calling this "... could lead to incorrect answers" instead of "data loss"?

@jiangxb1987 jiangxb1987 changed the title [SPARK-23207][SQL] Shuffle+Repartition on a DataFrame could lead to Data Loss [SPARK-23207][SQL] Shuffle+Repartition on a DataFrame could lead to incorrect answers Jan 26, 2018
@jiangxb1987
Copy link
Contributor Author

Updated the title, does it sound good to have this PR? I'll open another one to address the RDD.repartition() issue (which will target to 2.4).

@sameeragarwal
Copy link
Member

LGTM but we should get a broader consensus on this. In the meantime, I'm merging this patch to master/2.3.

asfgit pushed a commit that referenced this pull request Jan 26, 2018
…ncorrect answers

## What changes were proposed in this pull request?

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

## How was this patch tested?

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000
```

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #20393 from jiangxb1987/shuffle-repartition.

(cherry picked from commit 94c67a7)
Signed-off-by: Sameer Agarwal <sameerag@apache.org>
@jiangxb1987
Copy link
Contributor Author

I opened https://issues.apache.org/jira/browse/SPARK-23243 to track the RDD.repartition() patch, thanks for all the discussions! @shivaram @mridulm @sameeragarwal @gatorsmile

@asfgit asfgit closed this in 94c67a7 Jan 26, 2018
@shivaram
Copy link
Contributor

I'm fine with merging this -- I just dont want to this issue to be forgotten for RDDs as I think its a major correctness issue.

@mridulm @sameeragarwal Lets continue the discussion on the new JIRA.

@mridulm
Copy link
Contributor

mridulm commented Jan 27, 2018

@sameeragarwal I am not sure if we can make shuffle fetch deterministic - without quite a lot of perf overhead; do you have any thoughts on how to do this in case I am missing something here ?

@sameeragarwal
Copy link
Member

@mridulm one approach that Xingbo is looking into (independently of #20414) is to have the ShuffleBlockFetcherIterator remember the order of blocks it fetches and store them in that order. Given that the blocks will still be fetched in parallel, depending on the available buffer size, we'll then have to spill some out-of-order blocks on disk in order to avoid OOMs on the receiver (similar to #16989). While this would still regress performance, it might be better than the current local sort based fix. Note that I'm not arguing against the fact that hash partitioning would be the "best" fix in terms of performance, but it'd then defeat the purpose of repartition (due to skew).

kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
…ad of `SparkEnv.get.conf.get(SQLConf.RADIX_SORT_ENABLED)`.

## What changes were proposed in this pull request?

This is a follow-up of apache#20393.
We should read the conf `"spark.sql.sort.enableRadixSort"` from `SQLConf` instead of `SparkConf`, i.e., use `SQLConf.get.enableRadixSort` instead of `SparkEnv.get.conf.get(SQLConf.RADIX_SORT_ENABLED)`, otherwise the config is never read.

## How was this patch tested?

Existing tests.

Closes apache#23046 from ueshin/issues/SPARK-23207/conf.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit dad2d82)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
…ad of `SparkEnv.get.conf.get(SQLConf.RADIX_SORT_ENABLED)`.

## What changes were proposed in this pull request?

This is a follow-up of apache#20393.
We should read the conf `"spark.sql.sort.enableRadixSort"` from `SQLConf` instead of `SparkConf`, i.e., use `SQLConf.get.enableRadixSort` instead of `SparkEnv.get.conf.get(SQLConf.RADIX_SORT_ENABLED)`, otherwise the config is never read.

## How was this patch tested?

Existing tests.

Closes apache#23046 from ueshin/issues/SPARK-23207/conf.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit dad2d82)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@tcondie
Copy link
Contributor

tcondie commented Aug 5, 2019

@jiangxb1987 and @sameeragarwal we are seeing this issue in Spark 2.3.2 when a cache step is introduced after each repartition operation. I have not been able to repro it using the example listed in this PR and Jira. Could either of you please verify that this bug fix is complete and that adding a cache step would not affect the solution?

@tcondie
Copy link
Contributor

tcondie commented Aug 5, 2019

@jiangxb1987 could you please provide a little guidance on how to run the example repro for this issue? Spark seems to fail the job entirely when the kill switch brings down the executor, and consequently triggers the need to re-execute upstream stages, which Spark seems to punt on i.e., not do.

@sc-abhisheksoni
Copy link

I encountered this bug in my code on databricks:
20-50 Node cluster was using - Spark 2.4.3, Scala 2.11

I had a Dataframe with records for Unique IDs. Once the data was ready to write to BLOB storage, I repartitioned the data to 2 partitions and wrote to storage.

On reading the records back from storage the number of total records remained the same but the number of Unique Ids reduced. On looking at the data the repartitioning introduced duplicate records in the data that was written to BLOB.
As descriibed above the problem was non deterministic, and sometimes we got correct number of unique records while other times some data was duplicated.

Once I removed Repartitioning I have not encountered this issue again.

@jiangxb1987
Copy link
Contributor Author

@jiangxb1987 could you please provide a little guidance on how to run the example repro for this issue? Spark seems to fail the job entirely when the kill switch brings down the executor, and consequently triggers the need to re-execute upstream stages, which Spark seems to punt on i.e., not do.

IIRC When I run this example on databricks notebook, I started a cluster with 20 workers, and set spark.stage.maxConsecutiveAttempts to a very big number so the failed stage keeps retry.

dongjoon-hyun pushed a commit that referenced this pull request Aug 21, 2019
…n repartition case

## What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In #20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.

### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.

## How was this patch tested?

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes #25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
dongjoon-hyun pushed a commit that referenced this pull request Aug 21, 2019
…n repartition case

## What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In #20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.

### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.

## How was this patch tested?

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes #25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 2d9cc42)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
dongjoon-hyun pushed a commit that referenced this pull request Aug 21, 2019
…n repartition case

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In #20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

Fix the correctness bug caused by repartition after a shuffle.

Yes, user will get the right result in the case of repartition stage rerun.

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes #25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 2d9cc42)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
cloud-fan pushed a commit that referenced this pull request Sep 11, 2019
### What changes were proposed in this pull request?
PR #22112 fixed the todo added by PR #20393(SPARK-23207). We can remove it now.

### Why are the changes needed?
In order not to confuse developers.

### Does this PR introduce any user-facing change?
no

### How was this patch tested?
no need to test

Closes #25755 from LinhongLiu/remove-todo.

Authored-by: Liu,Linhong <liulinhong@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
PavithraRamachandran pushed a commit to PavithraRamachandran/spark that referenced this pull request Sep 15, 2019
### What changes were proposed in this pull request?
PR apache#22112 fixed the todo added by PR apache#20393(SPARK-23207). We can remove it now.

### Why are the changes needed?
In order not to confuse developers.

### Does this PR introduce any user-facing change?
no

### How was this patch tested?
no need to test

Closes apache#25755 from LinhongLiu/remove-todo.

Authored-by: Liu,Linhong <liulinhong@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
rluta pushed a commit to rluta/spark that referenced this pull request Sep 17, 2019
…n repartition case

## What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In apache#20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.

### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.

## How was this patch tested?

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 2d9cc42)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 25, 2019
…2] Shuffle+Repartition on a DataFrame could lead to incorrect answers

## What changes were proposed in this pull request?

Back port of apache#20393.

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000
```

Author: Xingbo Jiang <xingbo.jiangdatabricks.com>

## How was this patch tested?

Ran all SBT unit tests for org.apache.spark.sql.*.

Ran pyspark tests for module pyspark-sql.

Closes apache#22079 from bersprockets/SPARK-23207.

Lead-authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Co-authored-by: Bruce Robbins <bersprockets@gmail.com>
Co-authored-by: Zheng RuiFeng <ruifengz@foxmail.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 26, 2019
…2] Shuffle+Repartition on a DataFrame could lead to incorrect answers

## What changes were proposed in this pull request?

Back port of apache#20393.

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000
```

Author: Xingbo Jiang <xingbo.jiangdatabricks.com>

## How was this patch tested?

Ran all SBT unit tests for org.apache.spark.sql.*.

Ran pyspark tests for module pyspark-sql.

Closes apache#22079 from bersprockets/SPARK-23207.

Lead-authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Co-authored-by: Bruce Robbins <bersprockets@gmail.com>
Co-authored-by: Zheng RuiFeng <ruifengz@foxmail.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Sep 26, 2019
…n repartition case

## What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In apache#20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.

### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.

## How was this patch tested?

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 2d9cc42)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 27, 2019
…2] Shuffle+Repartition on a DataFrame could lead to incorrect answers

## What changes were proposed in this pull request?

Back port of apache#20393.

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000
```

Author: Xingbo Jiang <xingbo.jiangdatabricks.com>

## How was this patch tested?

Ran all SBT unit tests for org.apache.spark.sql.*.

Ran pyspark tests for module pyspark-sql.

Closes apache#22079 from bersprockets/SPARK-23207.

Lead-authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Co-authored-by: Bruce Robbins <bersprockets@gmail.com>
Co-authored-by: Zheng RuiFeng <ruifengz@foxmail.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
mccheah pushed a commit to palantir/spark that referenced this pull request Feb 4, 2020
…n repartition case

## What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In apache#20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.

### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.

## How was this patch tested?

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
mccheah added a commit to palantir/spark that referenced this pull request Feb 4, 2020
…n repartition case (#640)

## What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In apache#20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.

### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.

## How was this patch tested?

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>

Co-authored-by: Li Yuanjian <xyliyuanjian@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants