-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-8029] Robust shuffle writer #9610
Conversation
Test build #2031 has finished for PR 9610 at commit
|
@@ -93,6 +95,10 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB | |||
} { | |||
out.close() | |||
} | |||
indexFile.deleteOnExit() | |||
if (!tmp.renameTo(indexFile)) { | |||
throw new IOException(s"fail to rename index file $tmp to $indexFile") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will just kill the task, right? both tasks are actually just fine, and in fact the overall job should continue if one of them succeeds. But instead this will lead to the task getting retried, and potentially continuing to fail up to 4 times, though its actually finished successfully from another taskset? You could handle this in scheduler, but that would add some complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is very little chance that the two concurrent task will call renameTo
in the same time, even with that, one of them will succeed, the scheduler will mark the partition as success, and the failure will be ignored (not retried).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you test for this? I think the worry was about different TaskSets attempting the same map stage. Imagine that attempt 1 of the stage successfully completes a task, and sends back a map output status, but that status gets ignored because that stage attempt got cancelled. Attempt 2 might then fail to send a new status for it.
There seem to be two ways to fix it if this problem can actually occur -- either add MapOutputStatuses even from failed task sets or mark this new task as successful if a file exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On Thu, Nov 12, 2015 at 8:30 AM, Matei Zaharia notifications@github.com
wrote:
In
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
#9610 (comment):@@ -93,6 +95,10 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
} {
out.close()
}
- indexFile.deleteOnExit()
- if (!tmp.renameTo(indexFile)) {
throw new IOException(s"fail to rename index file $tmp to $indexFile")
Can you test for this? I think the worry was about different TaskSets
attempting the same map stage. Imagine that attempt 1 of the stage
successfully completes a task, and sends back a map output status, but that
status gets ignored because that stage attempt got cancelled. Attempt 2
might then fail to send a new status for it.There seem to be two ways to fix it if this problem can actually occur --
either add MapOutputStatuses even from failed task sets or mark this new
task as successful if a file exists.After this PR, the second attempt of same task will return SUCCESS, with
new MapOutputStatus, which could be different than the previous attempt
(having different sizes of partitions), since we does not use the exact
number of size (could be lossy compressed), I think it's fine.—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/9610/files#r44678796.
- Davies
Test build #2032 has finished for PR 9610 at commit
|
Test build #45600 has finished for PR 9610 at commit
|
Test build #45618 has finished for PR 9610 at commit
|
Test build #2040 has finished for PR 9610 at commit
|
Test build #2043 has finished for PR 9610 at commit
|
dataFile.delete() | ||
} | ||
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { | ||
throw new IOException("fail to rename data file " + dataTmp + " to " + dataFile) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is a particular flaw here, but its a bit hard to follow since its a mix of first-attempt-wins and last-attempt wins. First attempt if there is a data file & index file; last attempt if its only an index file. the problem w/ last-attempt is that this delete will fail on windows if the file is open for reading, I believe. Though we can't get around that because SPARK-4085 always requires us to delete some files that might be open, in which case we hope that we don't run into this race again on the next retry. It would be nice to minimize that case, though. We'd be closer to first-attempt-wins if we always wrote a dataFile, even if its empty when dataTmp == null.
There is also an issue w/ mapStatus & non-deterministic data. It might not matter which output you get, but the mapstatus should be consistent with the data that is read. If attempt 1 writes non-empty outputs a,b,c, and attempt 2 writes non-empty outputs d,e,f (which are not committed), the reduce tasks might get the mapstatus for attempt 2, look for outputs d,e,f, and get nothing but empty blocks. Matei had suggested writing the mapstatus to a file, so that subsequent attempts always return the map status corresponding to the first successful attempt.
you can take this test case if you like: https://github.com/squito/spark/blob/SPARK-8029_first_wins/core/src/test/scala/org/apache/spark/ShuffleSuite.scala#L351 I'd also add more test cases to cover the various paths through the output commit code. And I think that |
Test build #2053 has started for PR 9610 at commit |
@squito Thanks for reviewing this, I had included your regression test here, also added tests for resolver. |
val dataFile = getDataFile(shuffleId, mapId) | ||
// Note: there is only one IndexShuffleBlockResolver per executor | ||
synchronized { | ||
val existedLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
existingLengths
@@ -65,11 +66,11 @@ private[spark] class SortShuffleWriter[K, V, C]( | |||
// Don't bother including the time to open the merged output file in the shuffle write time, | |||
// because it just opens a single file, so is typically too fast to measure accurately | |||
// (see SPARK-3570). | |||
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) | |||
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) | |||
val tmp = Utils.tempFileWith(output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you call these outputTmp
or something so it's slightly easier to follow? (here and other places)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only one file output
here, I think it's obvious
@davies I took a pass and I find the approach correct and simple. I did a close review and confirmed that all four |
Test build #45800 has finished for PR 9610 at commit
|
@andrewor14 Thanks, this looks much better now. |
Test build #45824 has finished for PR 9610 at commit
|
The failed test is not related, I'm merging this into master, will create another PR for other branches. |
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem). This PR is based on #9214 , thanks to squito . Closes #9214 Author: Davies Liu <davies@databricks.com> Closes #9610 from davies/safe_shuffle.
Test build #2054 has finished for PR 9610 at commit
|
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem). This PR is based on apache#9214 , thanks to squito . Closes apache#9214 Author: Davies Liu <davies@databricks.com> Closes apache#9610 from davies/safe_shuffle.
@davies : If we want to merge two datafiles (one from first action of rdd and another from second action of same rdd), How can i do that? And do i need to do anything with indexfile? |
SPARK-8029 (#9610) modified shuffle writers to first stage their data to a temporary file in the same directory as the final destination file and then to atomically rename this temporary file at the end of the write job. However, this change introduced the potential for the temporary output file to be leaked if an exception occurs during the write because the shuffle writers' existing error cleanup code doesn't handle deletion of the temp file. This patch avoids this potential cause of disk-space leaks by adding `finally` blocks to ensure that temp files are always deleted if they haven't been renamed. Author: Josh Rosen <joshrosen@databricks.com> Closes #15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer. (cherry picked from commit 5b8f737) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
SPARK-8029 (#9610) modified shuffle writers to first stage their data to a temporary file in the same directory as the final destination file and then to atomically rename this temporary file at the end of the write job. However, this change introduced the potential for the temporary output file to be leaked if an exception occurs during the write because the shuffle writers' existing error cleanup code doesn't handle deletion of the temp file. This patch avoids this potential cause of disk-space leaks by adding `finally` blocks to ensure that temp files are always deleted if they haven't been renamed. Author: Josh Rosen <joshrosen@databricks.com> Closes #15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer. (cherry picked from commit 5b8f737) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
SPARK-8029 (#9610) modified shuffle writers to first stage their data to a temporary file in the same directory as the final destination file and then to atomically rename this temporary file at the end of the write job. However, this change introduced the potential for the temporary output file to be leaked if an exception occurs during the write because the shuffle writers' existing error cleanup code doesn't handle deletion of the temp file. This patch avoids this potential cause of disk-space leaks by adding `finally` blocks to ensure that temp files are always deleted if they haven't been renamed. Author: Josh Rosen <joshrosen@databricks.com> Closes #15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer.
SPARK-8029 (apache#9610) modified shuffle writers to first stage their data to a temporary file in the same directory as the final destination file and then to atomically rename this temporary file at the end of the write job. However, this change introduced the potential for the temporary output file to be leaked if an exception occurs during the write because the shuffle writers' existing error cleanup code doesn't handle deletion of the temp file. This patch avoids this potential cause of disk-space leaks by adding `finally` blocks to ensure that temp files are always deleted if they haven't been renamed. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer. (cherry picked from commit 5b8f737) Signed-off-by: Josh Rosen <joshrosen@databricks.com> (cherry picked from commit 8646b84)
SPARK-8029 (apache#9610) modified shuffle writers to first stage their data to a temporary file in the same directory as the final destination file and then to atomically rename this temporary file at the end of the write job. However, this change introduced the potential for the temporary output file to be leaked if an exception occurs during the write because the shuffle writers' existing error cleanup code doesn't handle deletion of the temp file. This patch avoids this potential cause of disk-space leaks by adding `finally` blocks to ensure that temp files are always deleted if they haven't been renamed. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#15104 from JoshRosen/cleanup-tmp-data-file-in-shuffle-writer.
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem).
This PR is based on #9214 , thanks to @squito . Closes #9214