-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-8029] Robust shuffle writer #9610
Changes from 7 commits
b2a90c6
9f0d2f9
55485a9
6deccff
f0c2a5d
d0b937f
35bd469
71b12bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,13 +21,12 @@ import java.io._ | |
|
||
import com.google.common.io.ByteStreams | ||
|
||
import org.apache.spark.{SparkConf, SparkEnv, Logging} | ||
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} | ||
import org.apache.spark.network.netty.SparkTransportConf | ||
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID | ||
import org.apache.spark.storage._ | ||
import org.apache.spark.util.Utils | ||
|
||
import IndexShuffleBlockResolver.NOOP_REDUCE_ID | ||
import org.apache.spark.{SparkEnv, Logging, SparkConf} | ||
|
||
/** | ||
* Create and maintain the shuffle blocks' mapping between logic block and physical file location. | ||
|
@@ -40,10 +39,17 @@ import IndexShuffleBlockResolver.NOOP_REDUCE_ID | |
*/ | ||
// Note: Changes to the format in this file should be kept in sync with | ||
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData(). | ||
private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver | ||
private[spark] class IndexShuffleBlockResolver( | ||
conf: SparkConf, | ||
_blockManager: BlockManager = null) | ||
extends ShuffleBlockResolver | ||
with Logging { | ||
|
||
private lazy val blockManager = SparkEnv.get.blockManager | ||
private lazy val blockManager = if (_blockManager == null) { | ||
SparkEnv.get.blockManager | ||
} else { | ||
_blockManager | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or
|
||
|
||
private val transportConf = SparkTransportConf.fromSparkConf(conf) | ||
|
||
|
@@ -74,14 +80,69 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB | |
} | ||
} | ||
|
||
/** | ||
* Check whether there are index file and data file also they are matched with each other, returns | ||
* the lengths of each block in data file, if there are matched, or return null. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
*/ | ||
private def checkIndexAndDataFile(index: File, data: File, blocks: Int): Array[Long] = { | ||
val lengths = new Array[Long](blocks) | ||
if (index.length() == (blocks + 1) * 8) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of the nested
|
||
// Read the lengths of blocks | ||
val f = try { | ||
new FileInputStream(index) | ||
} catch { | ||
case e: IOException => | ||
return null | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we at least |
||
} | ||
val in = new DataInputStream(new BufferedInputStream(f)) | ||
try { | ||
// Convert the offsets into lengths of each block | ||
var offset = in.readLong() | ||
if (offset != 0L) { | ||
return null | ||
} | ||
var i = 0 | ||
while (i < blocks) { | ||
val off = in.readLong() | ||
lengths(i) = off - offset | ||
offset = off | ||
i += 1 | ||
} | ||
} catch { | ||
case e: IOException => | ||
return null | ||
} finally { | ||
in.close() | ||
} | ||
|
||
val size = lengths.reduce(_ + _) | ||
// `length` returns 0 if it not exists. | ||
if (data.length() == size) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or just
|
||
lengths | ||
} else { | ||
null | ||
} | ||
} else { | ||
null | ||
} | ||
} | ||
|
||
/** | ||
* Write an index file with the offsets of each block, plus a final offset at the end for the | ||
* end of the output file. This will be used by getBlockData to figure out where each block | ||
* begins and ends. | ||
* | ||
* It will commit the data and index file as an atomic operation, use the existed ones (lengths of | ||
* blocks will be refreshed), or replace them with new ones. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should add that this modifies the contents of |
||
* */ | ||
def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = { | ||
def writeIndexFileAndCommit( | ||
shuffleId: Int, | ||
mapId: Int, | ||
lengths: Array[Long], | ||
dataTmp: File): Unit = { | ||
val indexFile = getIndexFile(shuffleId, mapId) | ||
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) | ||
val indexTmp = Utils.tempFileWith(indexFile) | ||
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) | ||
Utils.tryWithSafeFinally { | ||
// We take in lengths of each block, need to convert it to offsets. | ||
var offset = 0L | ||
|
@@ -93,6 +154,31 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB | |
} { | ||
out.close() | ||
} | ||
|
||
val dataFile = getDataFile(shuffleId, mapId) | ||
// Note: there is only one IndexShuffleBlockResolver per executor | ||
synchronized { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a big comment explaining what is going on here? Also worth noting that there is only one There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this |
||
val existedLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if (existedLengths != null) { | ||
// Use the lengths of existed output for MapStatus | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
System.arraycopy(existedLengths, 0, lengths, 0, lengths.length) | ||
dataTmp.delete() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. though unlikely, will this throw NPE? |
||
indexTmp.delete() | ||
} else { | ||
if (indexFile.exists()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
indexFile.delete() | ||
} | ||
if (dataFile.exists()) { | ||
dataFile.delete() | ||
} | ||
if (!indexTmp.renameTo(indexFile)) { | ||
throw new IOException("fail to rename data file " + indexTmp + " to " + indexFile) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. data -> index |
||
} | ||
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { | ||
throw new IOException("fail to rename data file " + dataTmp + " to " + dataFile) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think there is a particular flaw here, but its a bit hard to follow since its a mix of first-attempt-wins and last-attempt wins. First attempt if there is a data file & index file; last attempt if its only an index file. the problem w/ last-attempt is that this delete will fail on windows if the file is open for reading, I believe. Though we can't get around that because SPARK-4085 always requires us to delete some files that might be open, in which case we hope that we don't run into this race again on the next retry. It would be nice to minimize that case, though. We'd be closer to first-attempt-wins if we always wrote a dataFile, even if its empty when dataTmp == null. There is also an issue w/ mapStatus & non-deterministic data. It might not matter which output you get, but the mapstatus should be consistent with the data that is read. If attempt 1 writes non-empty outputs a,b,c, and attempt 2 writes non-empty outputs d,e,f (which are not committed), the reduce tasks might get the mapstatus for attempt 2, look for outputs d,e,f, and get nothing but empty blocks. Matei had suggested writing the mapstatus to a file, so that subsequent attempts always return the map status corresponding to the first successful attempt. |
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we check
what happens if this fails? Should we throw an exception? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will slowdown the normal path, I think it's not needed. |
||
} | ||
} | ||
} | ||
|
||
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ | |
|
||
package org.apache.spark.shuffle.hash | ||
|
||
import java.io.IOException | ||
|
||
import org.apache.spark._ | ||
import org.apache.spark.executor.ShuffleWriteMetrics | ||
import org.apache.spark.scheduler.MapStatus | ||
|
@@ -106,6 +108,28 @@ private[spark] class HashShuffleWriter[K, V]( | |
writer.commitAndClose() | ||
writer.fileSegment().length | ||
} | ||
// rename all shuffle files to final paths | ||
// Note: there is only one ShuffleBlockResolver in executor | ||
shuffleBlockResolver.synchronized { | ||
shuffle.writers.zipWithIndex.foreach { case (writer, i) => | ||
val output = blockManager.diskBlockManager.getFile(writer.blockId) | ||
if (sizes(i) > 0) { | ||
if (output.exists()) { | ||
// update the size of output for MapStatus | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or
|
||
sizes(i) = output.length() | ||
writer.file.delete() | ||
} else { | ||
if (!writer.file.renameTo(output)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
throw new IOException(s"fail to rename ${writer.file} to $output") | ||
} | ||
} | ||
} else { | ||
if (output.exists()) { | ||
output.delete() | ||
} | ||
} | ||
} | ||
} | ||
MapStatus(blockManager.shuffleServerId, sizes) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,8 +20,9 @@ package org.apache.spark.shuffle.sort | |
import org.apache.spark._ | ||
import org.apache.spark.executor.ShuffleWriteMetrics | ||
import org.apache.spark.scheduler.MapStatus | ||
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle} | ||
import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, ShuffleWriter} | ||
import org.apache.spark.storage.ShuffleBlockId | ||
import org.apache.spark.util.Utils | ||
import org.apache.spark.util.collection.ExternalSorter | ||
|
||
private[spark] class SortShuffleWriter[K, V, C]( | ||
|
@@ -65,11 +66,11 @@ private[spark] class SortShuffleWriter[K, V, C]( | |
// Don't bother including the time to open the merged output file in the shuffle write time, | ||
// because it just opens a single file, so is typically too fast to measure accurately | ||
// (see SPARK-3570). | ||
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) | ||
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) | ||
val tmp = Utils.tempFileWith(output) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you call these There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is only one file |
||
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) | ||
val partitionLengths = sorter.writePartitionedFile(blockId, outputFile) | ||
shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths) | ||
|
||
val partitionLengths = sorter.writePartitionedFile(blockId, tmp) | ||
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) | ||
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use your new method
Utils.withTempFile