Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23243][Core] Fix RDD.repartition() data correctness issue #22112

Closed
wants to merge 27 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Aug 15, 2018

What changes were proposed in this pull request?

An alternative fix for #21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:

  1. determinate. Always return the same result with same order when rerun.
  2. unordered. Returns same data set in random order when rerun.
  3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends org.apache.spark.Partitioner), so the reducers will still get the same input data set.

This PR fixed the failure handling for repartition, to avoid correctness issues.

For repartition, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains repartition reruns, we must also rerun all the tasks of all the succeeding stages.

future improvement:

  1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
  2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
  3. We should provide public API to allow users to tag the random level of the RDD's computing function.

How is this pull request tested?

a new test case

@SparkQA
Copy link

SparkQA commented Aug 15, 2018

Test build #94821 has finished for PR 22112 at commit 1f9f6e5.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

haven't look at code yet, does it just fail with ResultTask then?

@tgravescs
Copy link
Contributor

nevermind see you have an abortStage in there for ResultTask

@tgravescs
Copy link
Contributor

need to look at in more detail but if its straight forward could at least do this short term for the repartition case.

I guess I question whether we really want to do it for zip and other things, see my comment here though: #21698

// The map stage is not idempotent, we have to rerun all the tasks for the
// failed stage to get expected result.
failedStage match {
case s: ShuffleMapStage =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like we discussed, we shall also retry the partially finished succeeding stages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may also have to update the logic in removeExecutorAndUnregisterOutputs.

@mridulm
Copy link
Contributor

mridulm commented Aug 16, 2018

I am not sure what the definition of isIdempotent here is.

For example, from MapPartitionsRDD :

override private[spark] def isIdempotent = {
    if (inputOrderSensitive) {
      prev.isIdempotent
    } else {
      true
    }
  }

Consider:
val rdd1 = rdd.groupBy().map(...).repartition(...).filter(...).
By definition above, this would make rdd1 idempotent.
Depending on what the definition of idempotent is (partition level, record level, etc) - this can be correct or wrong code.

Similarly, I am not sure why idempotency or ordering is depending on Partitioner.
IMO we should traverse the dependency graph and rely on how ShuffledRDD is configured - whether there is a key ordering specified (applies to both global sort and per partition sort), whether it is from a checkpoint or marked for checkpoint, whether it is from a stable input source, etc.

Given these, I am finding the rest of the PR a bit hard to understand.

@jiangxb1987
Copy link
Contributor

IMO we should traverse the dependency graph and rely on how ShuffledRDD is configured

A trivial point here - Since ShuffleDependency is also a DeveloperAPI, it's possible for users to write a customized RDD that behaves like ShuffleRDD, so we may want to depend on dependencies rather than RDDs.

@mridulm
Copy link
Contributor

mridulm commented Aug 16, 2018

You are perfectly correct @jiangxb1987, that was a silly mistake on my part - and not trivial at all !
It should be shuffle dependency we should rely on when traversing the dependency tree, not shuffledrdd.

@tgravescs
Copy link
Contributor

Personally I don't want to talk about implementation until we decide what we want our semantics to be around the unordered operations because that affects any implementation.
If we are saying we need to fix zip and any other unordered operation that means we don't really support unordered operations and everything needs to be sorted.

I would propose we fix the things that are using the round robin type partitioning (repartition) but then unordered things like zip/MapPartitions (via user code) we document or perhaps give the user the option to sort.

@mridulm you caught the issues with zip and others and have said they need to be fixed, what are your thoughts?

@cloud-fan cloud-fan changed the title [WIP][SPARK-23243][Core] Fix RDD.repartition() data correctness issue [SPARK-23243][Core] Fix RDD.repartition() data correctness issue Aug 16, 2018
@cloud-fan
Copy link
Contributor Author

I would propose we fix the things that are using the round robin type partitioning (repartition) but then unordered things like zip/MapPartitions (via user code) we document or perhaps give the user the option to sort.

I've updated my PR to follow this, it's ready for review now :)

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #94855 has finished for PR 22112 at commit d187de8.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan force-pushed the repartition branch 2 times, most recently from 7ba536e to 2407328 Compare August 16, 2018 16:41
@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #94856 has finished for PR 22112 at commit 6f5d5e9.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #94857 has finished for PR 22112 at commit 2407328.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2018

Test build #94858 has finished for PR 22112 at commit a43acdc.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Aug 16, 2018

I agree @tgravescs, I was looking at the implementation to understand what the expectations are wrt newly introduced methods/fields and whether they make sense : I did not see any details furnished.
I don’t think we can hack our way out of this.

I would expect a solution for repartition to also be applicable to other order dependent closures as well - though we might choose to fix them later, the basic approach ideally should be transferable.

@tgravescs
Copy link
Contributor

@mridulm so just to clarify are you agreeing that we need to decide on what we do with zip and others or are you agreeing that we should document these as unordered actions thus retries might be different and only fix repartition?

We can certainly add other options later but I don't want to change what we say the core zip behavior is.

@mridulm
Copy link
Contributor

mridulm commented Aug 16, 2018

@tgravescs I was specifically in agreement with

Personally I don't want to talk about implementation until we decide what we want our semantics to be around the unordered operations because that affects any implementation.

and

I would propose we fix the things that are using the round robin type partitioning (repartition) but then unordered things like zip/MapPartitions (via user code) we document or perhaps give the user the option to sort.

IMO a fix in spark core for repartition should work for most (if not all) order dependent closures - we might choose not to implement for others due to time constraints; but basic idea should be fairly similar.
Given this, I am fine with documenting the potential issue for others and fix for a core subset - with assumption that we will expand solution to cover all later.

@tgravescs
Copy link
Contributor

Thanks for the clarification, but I guess my point is with your last statement:

  • with assumption that we will expand solution to cover all later.

If we document this and say we support unordered operations with the caveat that failures could result in different results, my assumption is we don't necessarily have to do anything else ever (this is what I am proposing). We could decide to for instance add an option to sort, or if its not a result stage fail more tasks to try handle the situation, but strictly speaking we wouldn't have to.

If you think we have to fix those operations that can result in unordered then I think it comes back to we just don't support unordered operations at all and we should say that and probably force the sort on all these operations and possibly on all operations where user could cause it to be different order on rerun.

@mridulm
Copy link
Contributor

mridulm commented Aug 17, 2018

@tgravescs To understand better, are you suggesting that we do not support any api and/or user closure which depends on input order ?
If yes, that would break not just repartition + shuffle, but also other publically exposed api in spark core and (my guess) non trivial aspects of mllib.

Or is it that we support repartition and possibly a few other high priority cases (sampling in mllib for example ?) and not support the rest ?

My (unproven) contention is that solution for repartition + shuffle would be a general solution (or very close to it) : which will then work for all other cases with suitable modifications as required.
By "expand solution to cover all later.", I was referring to these changes to leverage whatever we build for repartition in other usecases- for example set appropriate parameters, etc in interest of time.

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Aug 17, 2018

Actually we can extend the solution later and I've mentioned it in my PR description.

Basically there are 3 kinds of closures:

  1. totally random
  2. always output same data set in a random order
  3. always output same data sequence (same order)

Spark is able to handle closure 1, the cost is, whenever a fetch failure happens and a map task gets retried, Spark needs to rollback all the succeeding stages and retry them, because their input has changed. zip falls in this category, but due to time constraints, I think it's ok to document it and fix it later.

For closure 2, Spark can treat it as closure 3 if the shuffle partitioner is order insensitive like range/hash partitioner. This means, when a map task gets retried, it will produce the same data for the reducers, so we don't need to rollback all the succeeding stages. However, if the shuffle partitioner is order sensitive like round-robin, Spark has to treat it like closure 1 and rollback all the succeeding stages if a map task gets retried.

Closure 3 is already handled well by the current Spark.

In this PR, I assume all the RDDs' computing functions are closure 3, so that we don't have performance regression. The only exception is shuffled RDD, which outputs data in a random order because of the remote block fetching.

In the future, we can extend RDD#isIdempotent to an enum to indicate the 3 closure types, and change the FetchFailed handling logic accordingly.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 17, 2018

Test build #94893 has finished for PR 22112 at commit a43acdc.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

* its output by itself and use a dummy partitioner later.
*/
// This is defined as a `var` here instead of the constructor, to pass the mima check.
private[spark] var orderSensitivePartitioner: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Output order is orthogonal to what the partitioner is - but enforced by whether keyOrdering is defined in shuffle dependency. We should not associate order sensitivity to partitioner (which has no influence on order).

"For example, hash and range partitioners are order insensitive, round-robin partitioner is order sensitive".
There is no round robin partitioner in spark core.

Similarly

"... because it's common that a map task partitions its output by itself and use a dummy partitioner later."

Tasks do not partition - that is the responsibility of partitioner.
There can be implementations where tasks and partitioner work in tandem - but that is an impl detail of user code (distributePartition is essentially user code which happens to be bundled as part of spark).

In general, something like this would suffice here (whether rdd is ordered or not):

def isOrdered = keyOrdering.isDefined

* Spark requires the computing function to always output the same data set(the order can vary)
* given the same input data set. For example, a computing function that increases each input
* value by 1 is valid, a computing function that increases each input by a random value is
* invalid.
Copy link
Contributor

@mridulm mridulm Aug 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"a computing function that increases each input by a random value is invalid." is too stringent a restriction imo and not in line with the preceding requirement of essentially generating same output given same input data iterator.

As I mentioned before, in typical MR based systems there are three types of closures in general:

  • Truely random output - this is something MR systems do not support well except in some very specific cases: primarily due to repeatability of computation not being guaranteed.
    • I am not considering pseudo random repeatable computations here.
  • order sensitive output - task output is dependent on both input order and computation.
    • In spark core we have:
      • coalesce with shuffle (repartition uses this)
      • *sample*
      • zip*
      • random*
      • glom, take, etc.
        • (Edit): In 'typical' usage, these api's should not be a problem - even though they are affected.
        • take is typically used in ResultStage to show an RDD (unless RDD was sorted to do top - in which case there is no impact).
        • Similarly, glom will result in re-ordering of array with all data in there : it depends on what user closure does with it. The input is changed, but typical usages should not be affected too much IMO.
      • I am ignoring the *approx* variants - since they are approximation's by definition.
    • There should be a lot of usage within mllib (atleast yahoo's internal BigML library did have a lot of it).
    • I vaguely remember a bunch of usages in graphx when I had last checked it a few years back.
    • I dont have much context into sql, but you should know about that better :-)
    • Note that here we assume same output order for exact same input order + tuples. There are user closures which violate this (for example, process partition and iterate result from a Map - which can change order depending on map impl) : and that is not something we support IMO.
  • record dependent - output for a single record depends only on current input record.
    • per record filter, map, etc.
    • A variant of this is where the entire partition is processed by closure - without regard to order : that is, the input iterator is consumed in mapPartitions, but closure is agnostic to tuple order (count() in spark core is a trivial example).

Order sensitive output is supported by hadoop mapreduce systems due to the sort which is done as part of shuffle (all keys are comparable) - but in spark, this requires specifying keyOrdering to explicitly enable shuffle output sort (as part of global sort or per partition sort).
This allows spark to avoid the cost when it is not applicable (if shuffle map task is only record dependent) : but in case of order sensitive input's, this assumption breaks.

* Note that, `zip` violates the requirement of the RDD computing function. If the order of input
* data changes, `zip` will return different result. Because of this, Spark may return unexpected
* result if there is a shuffle after `zip`, and the shuffle failed and retried. To workaround
* this, users can call `zipPartitions` and sort the input data before zip.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All zip method are affected by it, not just this one.
I added a list of other methods I have used from memory (though unfortunately it is not exhaustive)

* not only satisfies the requirement, but also produce the same output sequence(the output order
* can't vary) given the same input sequence. Spark assumes all the RDDs are idempotent, except
* for the shuffle RDD and RDDs derived from non-idempotent RDD.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will mean all rdd's which are directly or indirectly reading from an unsorted shuffle output are not 'idempotent'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that is expected, unless the computing function sorts the input data. For this case, we can override the isIdempotent.

dependencies.forall { dep =>
// Shuffle RDD is always considered as non-idempotent, because its computing function needs
// to fetch remote shuffle blocks, and these fetched blocks may arrive in a random order.
!dep.isInstanceOf[ShuffleDependency[_, _, _]] && dep.rdd.isIdempotent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too strict.
As I discussed with @jiangxb1987 , something like this would be better:

  dep =>
    dep match {
      case shuffleDep: ShuffleDependency[_, _, _] => shuffleDep.keyOrdering.isDefined
      // IIRC this is not comprehensive if checkpoint is happening as part of this job.
      case checkpointedDep: Dependency[_] if checkpointedDep.rdd.isCheckpointed => true
      case _ => dep.rdd.isIdempotent
    }

Note that this method can end up with stack overflow error's - please refer to DAGScheduler.stageDependsOn which does a similar dependency traveral (but for different purpose).

if (!mapStage.rdd.isIdempotent && mapStage.shuffleDep.orderSensitivePartitioner) {
def rollBackStage(stage: Stage): Unit = stage match {
case mapStage: ShuffleMapStage =>
if (mapStage.findMissingPartitions().length < mapStage.numPartitions) {
Copy link
Contributor

@mridulm mridulm Aug 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is making an assumption that all partitions of an RDD are getting computed in a stage - which is not necessarily true in a general case (see numTasks vs numPartitions).

@tgravescs
Copy link
Contributor

@cloud-fan I think you summarized it nicely, but I think you keep forgetting about the ResultTask though. It's one thing to say this is a temporary work around and if you have a failure in a ResultTask where we can't rerun all the reduces then just FAIL, but I don't think that should be our long term solution to that. That basically means Spark is not resilient to failures....

Basically what I'm saying is that 3 is handled, case 2, we fix (however that is we decide and eventually it has a resilient option and not just fail the entire job, which to me is the short term workaround).

case 1, is what I think we need clarification on as to me this is an API level decision and I don't want to change our minds and confuse users. My opinion is we either say yes its ok to have random data but with the caveat if things retry you might get different results. Or we force it into case 2 or 3.
If we say we don't support random operations, then do we have put the same logic on any operation the user could cause the data to be random? Which seems like a lot of operations and thus could be a big performance penalty to handle failures. I guess there is a 3rd option in that we handle it for ones we have api's that explicitly do this like zip, but ignore other ones where user code could cause this (map, mappartitions, etc).

@mridulm thanks for clarifying, I do agree that if we have a solution to case 2, it could be used to help with case 1, but is that essentially saying we don't support case 1?

I think what you are saying is that for now we document its ok to have random data with the caveat retries could return different results but then down the road we essentially say its not supported because we are going to force a sort. I just think that could be confusing to the user.

@tgravescs
Copy link
Contributor

testing so far looks good. I'm +1 for this.

@gatorsmile
Copy link
Member

Thanks! Merged to master.

@asfgit asfgit closed this in 71bd796 Sep 5, 2018
@tgravescs
Copy link
Contributor

we should pull this back into spark 2.3 at least, I don't think this is a clean cherry pick due to barrier scheduling stuff, would you be willing to put up PR?

@cloud-fan
Copy link
Contributor Author

I'm preparing a PR for 2.3, thanks for reminding!

cloud-fan added a commit to cloud-fan/spark that referenced this pull request Sep 6, 2018
An alternative fix for apache#21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

a new test case

Closes apache#22112 from cloud-fan/repartition.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
asfgit pushed a commit that referenced this pull request Sep 7, 2018
backport #22112 to 2.3

-------

An alternative fix for #21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

a new test case

Closes #22354 from cloud-fan/repartition.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
bersprockets pushed a commit to bersprockets/spark that referenced this pull request Sep 7, 2018
backport apache#22112 to 2.2

-------

An alternative fix for apache#21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

a new test case

Closes apache#22354 from cloud-fan/repartition.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Sep 11, 2019
### What changes were proposed in this pull request?
PR #22112 fixed the todo added by PR #20393(SPARK-23207). We can remove it now.

### Why are the changes needed?
In order not to confuse developers.

### Does this PR introduce any user-facing change?
no

### How was this patch tested?
no need to test

Closes #25755 from LinhongLiu/remove-todo.

Authored-by: Liu,Linhong <liulinhong@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
PavithraRamachandran pushed a commit to PavithraRamachandran/spark that referenced this pull request Sep 15, 2019
### What changes were proposed in this pull request?
PR apache#22112 fixed the todo added by PR apache#20393(SPARK-23207). We can remove it now.

### Why are the changes needed?
In order not to confuse developers.

### Does this PR introduce any user-facing change?
no

### How was this patch tested?
no need to test

Closes apache#25755 from LinhongLiu/remove-todo.

Authored-by: Liu,Linhong <liulinhong@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Sep 23, 2019
…enerate the shuffle files

After the newly added shuffle block fetching protocol in #24565, we can keep this work by extending the FetchShuffleBlocks message.

### What changes were proposed in this pull request?
In this patch, we achieve the indeterminate shuffle rerun by reusing the task attempt id(unique id within an application) in shuffle id, so that each shuffle write attempt has a different file name. For the indeterministic stage, when the stage resubmits, we'll clear all existing map status and rerun all partitions.

All changes are summarized as follows:
- Change the mapId to mapTaskAttemptId in shuffle related id.
- Record the mapTaskAttemptId in MapStatus.
- Still keep mapId in ShuffleFetcherIterator for fetch failed scenario.
- Add the determinate flag in Stage and use it in DAGScheduler and the cleaning work for the intermediate stage.

### Why are the changes needed?
This is a follow-up work for #22112's future improvment[1]: `Currently we can't rollback and rerun a shuffle map stage, and just fail.`

Spark will rerun a finished shuffle write stage while meeting fetch failures, currently, the rerun shuffle map stage will only resubmit the task for missing partitions and reuse the output of other partitions. This logic is fine in most scenarios, but for indeterministic operations(like repartition), multiple shuffle write attempts may write different data, only rerun the missing partition will lead a correctness bug. So for the shuffle map stage of indeterministic operations, we need to support rolling back the shuffle map stage and re-generate the shuffle files.

### Does this PR introduce any user-facing change?
Yes, after this PR, the indeterminate stage rerun will be accepted by rerunning the whole stage. The original behavior is aborting the stage and fail the job.

### How was this patch tested?
- UT: Add UT for all changing code and newly added function.
- Manual Test: Also providing a manual test to verify the effect.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val determinateStage0 = sc.parallelize(0 until 1000 * 1000 * 100, 10)
val indeterminateStage1 = determinateStage0.repartition(200)
val indeterminateStage2 = indeterminateStage1.repartition(200)
val indeterminateStage3 = indeterminateStage2.repartition(100)
val indeterminateStage4 = indeterminateStage3.repartition(300)
val fetchFailIndeterminateStage4 = indeterminateStage4.map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 190 &&
  TaskContext.get.stageAttemptNumber == 0) {
  throw new Exception("pkill -f -n java".!!)
  }
  x
}
val indeterminateStage5 = fetchFailIndeterminateStage4.repartition(200)
val finalStage6 = indeterminateStage5.repartition(100).collect().distinct.length
```
It's a simple job with multi indeterminate stage, it will get a wrong answer while using old Spark version like 2.2/2.3, and will be killed after #22112. With this fix, the job can retry all indeterminate stage as below screenshot and get the right result.
![image](https://user-images.githubusercontent.com/4833765/63948434-3477de00-caab-11e9-9ed1-75abfe6d16bd.png)

Closes #25620 from xuanyuanking/SPARK-25341-8.27.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants