Skip to content

Commit

Permalink
[SPARK-8029] Robust shuffle writer
Browse files Browse the repository at this point in the history
Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem).

This PR is based on #9214 , thanks to squito . Closes #9214

Author: Davies Liu <davies@databricks.com>

Closes #9610 from davies/safe_shuffle.
  • Loading branch information
Davies Liu authored and davies committed Nov 13, 2015
1 parent ea5ae27 commit ad96088
Show file tree
Hide file tree
Showing 16 changed files with 402 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
Expand Down Expand Up @@ -155,9 +155,10 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
writer.commitAndClose();
}

partitionLengths =
writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId));
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File tmp = Utils.tempFileWith(output);
partitionLengths = writePartitionedFile(tmp);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.io.CompressionCodec$;
import org.apache.spark.io.LZFCompressionCodec;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
Expand All @@ -53,7 +53,7 @@
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;

@Private
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
Expand Down Expand Up @@ -206,16 +206,18 @@ void closeAndWriteOutput() throws IOException {
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final File tmp = Utils.tempFileWith(output);
try {
partitionLengths = mergeSpills(spills);
partitionLengths = mergeSpills(spills, tmp);
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
}
}
}
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

Expand Down Expand Up @@ -248,8 +250,7 @@ void forceSorterToSpill() throws IOException {
*
* @return the partition lengths in the merged file.
*/
private long[] mergeSpills(SpillInfo[] spills) throws IOException {
final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
final boolean fastMergeEnabled =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._

import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage._
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
import org.apache.spark.{Logging, SparkConf, SparkEnv}

/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
Expand Down Expand Up @@ -84,17 +84,8 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
// Because of previous failures, the shuffle file may already exist on this machine.
// If so, remove it.
if (blockFile.exists) {
if (blockFile.delete()) {
logInfo(s"Removed existing shuffle file $blockFile")
} else {
logWarning(s"Failed to remove existing shuffle file $blockFile")
}
}
blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize,
writeMetrics)
val tmp = Utils.tempFileWith(blockFile)
blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize, writeMetrics)
}
}
// Creating the file to write to and creating a disk writer both involve interacting with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import java.io._

import com.google.common.io.ByteStreams

import org.apache.spark.{SparkConf, SparkEnv, Logging}
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage._
import org.apache.spark.util.Utils

import IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.{SparkEnv, Logging, SparkConf}

/**
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
Expand All @@ -40,10 +39,13 @@ import IndexShuffleBlockResolver.NOOP_REDUCE_ID
*/
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver
private[spark] class IndexShuffleBlockResolver(
conf: SparkConf,
_blockManager: BlockManager = null)
extends ShuffleBlockResolver
with Logging {

private lazy val blockManager = SparkEnv.get.blockManager
private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager)

private val transportConf = SparkTransportConf.fromSparkConf(conf)

Expand Down Expand Up @@ -74,14 +76,69 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
}
}

/**
* Check whether the given index and data files match each other.
* If so, return the partition lengths in the data file. Otherwise return null.
*/
private def checkIndexAndDataFile(index: File, data: File, blocks: Int): Array[Long] = {
// the index file should have `block + 1` longs as offset.
if (index.length() != (blocks + 1) * 8) {
return null
}
val lengths = new Array[Long](blocks)
// Read the lengths of blocks
val in = try {
new DataInputStream(new BufferedInputStream(new FileInputStream(index)))
} catch {
case e: IOException =>
return null
}
try {
// Convert the offsets into lengths of each block
var offset = in.readLong()
if (offset != 0L) {
return null
}
var i = 0
while (i < blocks) {
val off = in.readLong()
lengths(i) = off - offset
offset = off
i += 1
}
} catch {
case e: IOException =>
return null
} finally {
in.close()
}

// the size of data file should match with index file
if (data.length() == lengths.sum) {
lengths
} else {
null
}
}

/**
* Write an index file with the offsets of each block, plus a final offset at the end for the
* end of the output file. This will be used by getBlockData to figure out where each block
* begins and ends.
*
* It will commit the data and index file as an atomic operation, use the existing ones, or
* replace them with new ones.
*
* Note: the `lengths` will be updated to match the existing index file if use the existing ones.
* */
def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Int,
lengths: Array[Long],
dataTmp: File): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
val indexTmp = Utils.tempFileWith(indexFile)
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
Expand All @@ -93,6 +150,37 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
} {
out.close()
}

val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
synchronized {
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
if (existingLengths != null) {
// Another attempt for the same task has already written our map outputs successfully,
// so just use the existing partition lengths and delete our temporary map outputs.
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
indexTmp.delete()
} else {
// This is the first successful attempt in writing the map outputs for this task,
// so override any existing index and data files with the ones we wrote.
if (indexFile.exists()) {
indexFile.delete()
}
if (dataFile.exists()) {
dataFile.delete()
}
if (!indexTmp.renameTo(indexFile)) {
throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
}
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
}
}
}
}

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.shuffle.hash

import java.io.IOException

import org.apache.spark._
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus
Expand Down Expand Up @@ -106,6 +108,29 @@ private[spark] class HashShuffleWriter[K, V](
writer.commitAndClose()
writer.fileSegment().length
}
// rename all shuffle files to final paths
// Note: there is only one ShuffleBlockResolver in executor
shuffleBlockResolver.synchronized {
shuffle.writers.zipWithIndex.foreach { case (writer, i) =>
val output = blockManager.diskBlockManager.getFile(writer.blockId)
if (sizes(i) > 0) {
if (output.exists()) {
// Use length of existing file and delete our own temporary one
sizes(i) = output.length()
writer.file.delete()
} else {
// Commit by renaming our temporary file to something the fetcher expects
if (!writer.file.renameTo(output)) {
throw new IOException(s"fail to rename ${writer.file} to $output")
}
}
} else {
if (output.exists()) {
output.delete()
}
}
}
}
MapStatus(blockManager.shuffleServerId, sizes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package org.apache.spark.shuffle.sort
import org.apache.spark._
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle}
import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, ShuffleWriter}
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.ExternalSorter

private[spark] class SortShuffleWriter[K, V, C](
Expand Down Expand Up @@ -65,11 +66,11 @@ private[spark] class SortShuffleWriter[K, V, C](
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, outputFile)
shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._
import scala.util.control.NonFatal
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Random
import scala.util.control.NonFatal

import sun.nio.ch.DirectBuffer

Expand All @@ -38,9 +38,8 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExternalShuffleClient
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.serializer.{SerializerInstance, Serializer}
import org.apache.spark.serializer.{Serializer, SerializerInstance}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.util._

private[spark] sealed trait BlockValues
Expand Down Expand Up @@ -660,7 +659,7 @@ private[spark] class BlockManager(
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
new DiskBlockObjectWriter(file, serializerInstance, bufferSize, compressStream,
syncWrites, writeMetrics)
syncWrites, writeMetrics, blockId)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ import org.apache.spark.util.Utils
* reopened again.
*/
private[spark] class DiskBlockObjectWriter(
file: File,
val file: File,
serializerInstance: SerializerInstance,
bufferSize: Int,
compressStream: OutputStream => OutputStream,
syncWrites: Boolean,
// These write metrics concurrently shared with other active DiskBlockObjectWriters who
// are themselves performing writes. All updates must be relative.
writeMetrics: ShuffleWriteMetrics)
writeMetrics: ShuffleWriteMetrics,
val blockId: BlockId = null)
extends OutputStream
with Logging {

Expand Down
Loading

0 comments on commit ad96088

Please sign in to comment.