Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Nov 4, 2014
2 parents dba5ea7 + 2812815 commit acff637
Show file tree
Hide file tree
Showing 36 changed files with 487 additions and 200 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ case class FetchFailed(
bmAddress: BlockManagerId, // Note that bmAddress can be null
shuffleId: Int,
mapId: Int,
reduceId: Int)
reduceId: Int,
message: String)
extends TaskFailedReason {
override def toErrorString: String = {
val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId)"
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " +
s"message=\n$message\n)"
}
}

Expand Down
25 changes: 14 additions & 11 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,20 +211,11 @@ class HadoopRDD[K, V](

val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val jobConf = getJobConf()
val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()

val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
// Find a function that will return the FileSystem bytes read by this thread.
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
Expand All @@ -234,6 +225,18 @@ class HadoopRDD[K, V](
if (bytesReadCallback.isDefined) {
context.taskMetrics.inputMetrics = Some(inputMetrics)
}

var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()

var recordsSinceMetricsUpdate = 0

override def getNext() = {
Expand Down
26 changes: 14 additions & 12 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,10 @@ class NewHadoopRDD[K, V](
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
format match {
case configurable: Configurable =>
configurable.setConf(conf)
case _ =>
}
val reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)

val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
// Find a function that will return the FileSystem bytes read by this thread.
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
Expand All @@ -131,6 +121,18 @@ class NewHadoopRDD[K, V](
context.taskMetrics.inputMetrics = Some(inputMetrics)
}

val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
format match {
case configurable: Configurable =>
configurable.setConf(conf)
case _ =>
}
val reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
var havePair = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ class DAGScheduler(
logInfo("Resubmitted " + task + ", so marking it as still running")
stage.pendingTasks += task

case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleToMapStage(shuffleId)

Expand All @@ -1063,7 +1063,7 @@ class DAGScheduler(
if (runningStages.contains(failedStage)) {
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
s"due to a fetch failure from $mapStage (${mapStage.name})")
markStageAsFinished(failedStage, Some("Fetch failure"))
markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage))
runningStages -= failedStage
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
" STAGE_ID=" + taskEnd.stageId
stageLogInfo(taskEnd.stageId, taskStatus)
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) =>
taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" +
taskEnd.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
mapId + " REDUCE_ID=" + reduceId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.shuffle

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.{FetchFailed, TaskEndReason}
import org.apache.spark.util.Utils

/**
* Failed to fetch a shuffle block. The executor catches this exception and propagates it
Expand All @@ -30,13 +31,11 @@ private[spark] class FetchFailedException(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int)
extends Exception {

override def getMessage: String =
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)
reduceId: Int,
message: String)
extends Exception(message) {

def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId)
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
}

/**
Expand All @@ -46,7 +45,4 @@ private[spark] class MetadataFetchFailedException(
shuffleId: Int,
reduceId: Int,
message: String)
extends FetchFailedException(null, shuffleId, -1, reduceId) {

override def getMessage: String = message
}
extends FetchFailedException(null, shuffleId, -1, reduceId, message)
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package org.apache.spark.shuffle.hash

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.util.{Failure, Success, Try}

import org.apache.spark._
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.{CompletionIterator, Utils}

private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
Expand Down Expand Up @@ -52,21 +53,22 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
}

def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
case Some(block) => {
case Success(block) => {
block.asInstanceOf[Iterator[T]]
}
case None => {
case Failure(e) => {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId,
Utils.exceptionString(e))
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block")
"Failed to get block " + blockId + ", which is not a shuffle block", e)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.util.concurrent.LinkedBlockingQueue

import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
import scala.util.{Failure, Success, Try}

import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.network.BlockTransferService
Expand Down Expand Up @@ -55,7 +56,7 @@ final class ShuffleBlockFetcherIterator(
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
serializer: Serializer,
maxBytesInFlight: Long)
extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
extends Iterator[(BlockId, Try[Iterator[Any]])] with Logging {

import ShuffleBlockFetcherIterator._

Expand Down Expand Up @@ -118,16 +119,18 @@ final class ShuffleBlockFetcherIterator(
private[this] def cleanup() {
isZombie = true
// Release the current buffer if necessary
if (currentResult != null && !currentResult.failed) {
currentResult.buf.release()
currentResult match {
case SuccessFetchResult(_, _, buf) => buf.release()
case _ =>
}

// Release buffers in the results queue
val iter = results.iterator()
while (iter.hasNext) {
val result = iter.next()
if (!result.failed) {
result.buf.release()
result match {
case SuccessFetchResult(_, _, buf) => buf.release()
case _ =>
}
}
}
Expand All @@ -151,7 +154,7 @@ final class ShuffleBlockFetcherIterator(
// Increment the ref count because we need to pass this to a different thread.
// This needs to be released after use.
buf.retain()
results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), buf))
results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf))
shuffleMetrics.remoteBytesRead += buf.size
shuffleMetrics.remoteBlocksFetched += 1
}
Expand All @@ -160,7 +163,7 @@ final class ShuffleBlockFetcherIterator(

override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
results.put(new FetchResult(BlockId(blockId), -1, null))
results.put(new FailureFetchResult(BlockId(blockId), e))
}
}
)
Expand Down Expand Up @@ -231,12 +234,12 @@ final class ShuffleBlockFetcherIterator(
val buf = blockManager.getBlockData(blockId)
shuffleMetrics.localBlocksFetched += 1
buf.retain()
results.put(new FetchResult(blockId, 0, buf))
results.put(new SuccessFetchResult(blockId, 0, buf))
} catch {
case e: Exception =>
// If we see an exception, stop immediately.
logError(s"Error occurred while fetching local blocks", e)
results.put(new FetchResult(blockId, -1, null))
results.put(new FailureFetchResult(blockId, e))
return
}
}
Expand Down Expand Up @@ -267,36 +270,39 @@ final class ShuffleBlockFetcherIterator(

override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch

override def next(): (BlockId, Option[Iterator[Any]]) = {
override def next(): (BlockId, Try[Iterator[Any]]) = {
numBlocksProcessed += 1
val startFetchWait = System.currentTimeMillis()
currentResult = results.take()
val result = currentResult
val stopFetchWait = System.currentTimeMillis()
shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)
if (!result.failed) {
bytesInFlight -= result.size

result match {
case SuccessFetchResult(_, size, _) => bytesInFlight -= size
case _ =>
}
// Send fetch requests up to maxBytesInFlight
while (fetchRequests.nonEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}

val iteratorOpt: Option[Iterator[Any]] = if (result.failed) {
None
} else {
val is = blockManager.wrapForCompression(result.blockId, result.buf.createInputStream())
val iter = serializer.newInstance().deserializeStream(is).asIterator
Some(CompletionIterator[Any, Iterator[Any]](iter, {
// Once the iterator is exhausted, release the buffer and set currentResult to null
// so we don't release it again in cleanup.
currentResult = null
result.buf.release()
}))
val iteratorTry: Try[Iterator[Any]] = result match {
case FailureFetchResult(_, e) => Failure(e)
case SuccessFetchResult(blockId, _, buf) => {
val is = blockManager.wrapForCompression(blockId, buf.createInputStream())
val iter = serializer.newInstance().deserializeStream(is).asIterator
Success(CompletionIterator[Any, Iterator[Any]](iter, {
// Once the iterator is exhausted, release the buffer and set currentResult to null
// so we don't release it again in cleanup.
currentResult = null
buf.release()
}))
}
}

(result.blockId, iteratorOpt)
(result.blockId, iteratorTry)
}
}

Expand All @@ -315,14 +321,30 @@ object ShuffleBlockFetcherIterator {
}

/**
* Result of a fetch from a remote block. A failure is represented as size == -1.
* Result of a fetch from a remote block.
*/
private[storage] sealed trait FetchResult {
val blockId: BlockId
}

/**
* Result of a fetch from a remote block successfully.
* @param blockId block id
* @param size estimated size of the block, used to calculate bytesInFlight.
* Note that this is NOT the exact bytes. -1 if failure is present.
* @param buf [[ManagedBuffer]] for the content. null is error.
* Note that this is NOT the exact bytes.
* @param buf [[ManagedBuffer]] for the content.
*/
case class FetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer) {
def failed: Boolean = size == -1
if (failed) assert(buf == null) else assert(buf != null)
private[storage] case class SuccessFetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer)
extends FetchResult {
require(buf != null)
require(size >= 0)
}

/**
* Result of a fetch from a remote block unsuccessfully.
* @param blockId block id
* @param e the failure exception
*/
private[storage] case class FailureFetchResult(blockId: BlockId, e: Throwable)
extends FetchResult
}
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ private[spark] object JsonProtocol {
("Block Manager Address" -> blockManagerAddress) ~
("Shuffle ID" -> fetchFailed.shuffleId) ~
("Map ID" -> fetchFailed.mapId) ~
("Reduce ID" -> fetchFailed.reduceId)
("Reduce ID" -> fetchFailed.reduceId) ~
("Message" -> fetchFailed.message)
case exceptionFailure: ExceptionFailure =>
val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing)
Expand Down Expand Up @@ -629,7 +630,9 @@ private[spark] object JsonProtocol {
val shuffleId = (json \ "Shuffle ID").extract[Int]
val mapId = (json \ "Map ID").extract[Int]
val reduceId = (json \ "Reduce ID").extract[Int]
new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId)
val message = Utils.jsonOption(json \ "Message").map(_.extract[String])
new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId,
message.getOrElse("Unknown reason"))
case `exceptionFailure` =>
val className = (json \ "Class Name").extract[String]
val description = (json \ "Description").extract[String]
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1597,7 +1597,7 @@ private[spark] object Utils extends Logging {
}

/** Return a nice string representation of the exception, including the stack trace. */
def exceptionString(e: Exception): String = {
def exceptionString(e: Throwable): String = {
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
}

Expand Down
Loading

0 comments on commit acff637

Please sign in to comment.