Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dt-timing
Browse files Browse the repository at this point in the history
  • Loading branch information
jkbradley committed Aug 8, 2014
2 parents bcf874a + 9a54de1 commit f61e9d2
Show file tree
Hide file tree
Showing 21 changed files with 541 additions and 212 deletions.
2 changes: 1 addition & 1 deletion bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
FWDIR="$(cd `dirname $0`/..; pwd)"

function usage {
echo "Usage: ./sbin/spark-sql [options] [cli option]"
echo "Usage: ./bin/spark-sql [options] [cli option]"
pattern="usage"
pattern+="\|Spark assembly has been built with Hive"
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
Expand Down
27 changes: 18 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,9 @@ object SparkEnv extends Logging {
conf.set("spark.driver.port", boundPort.toString)
}

// Create an instance of the class named by the given Java system property, or by
// defaultClassName if the property is not set, and return it as a T
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
val name = conf.get(propertyName, defaultClassName)
val cls = Class.forName(name, true, Utils.getContextOrSparkClassLoader)
// Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass[T](className: String): T = {
val cls = Class.forName(className, true, Utils.getContextOrSparkClassLoader)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
try {
Expand All @@ -178,11 +176,17 @@ object SparkEnv extends Logging {
}
}

val serializer = instantiateClass[Serializer](
// Create an instance of the class named by the given SparkConf property, or defaultClassName
// if the property is not set, possibly initializing it with our conf
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
instantiateClass[T](conf.get(propertyName, defaultClassName))
}

val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")

val closureSerializer = instantiateClass[Serializer](
val closureSerializer = instantiateClassFromConf[Serializer](
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
Expand Down Expand Up @@ -246,8 +250,13 @@ object SparkEnv extends Logging {
"."
}

val shuffleManager = instantiateClass[ShuffleManager](
"spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {

/** Fill in values by parsing user options. */
private def parseOpts(opts: Seq[String]): Unit = {
var inSparkOpts = true
val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r

// Delineates parsing of Spark options from parsing of user options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ private[spark] class Executor(
for (taskRunner <- runningTasks.values()) {
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
tasksMetrics += ((taskRunner.taskId, metrics))
}
}
Expand Down
55 changes: 42 additions & 13 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.executor

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.{BlockId, BlockStatus}

Expand Down Expand Up @@ -81,12 +83,27 @@ class TaskMetrics extends Serializable {
var inputMetrics: Option[InputMetrics] = None

/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
* This includes read metrics aggregated over all the task's shuffle dependencies.
*/
private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None

def shuffleReadMetrics = _shuffleReadMetrics

/**
* This should only be used when recreating TaskMetrics, not when updating read metrics in
* executors.
*/
private[spark] def setShuffleReadMetrics(shuffleReadMetrics: Option[ShuffleReadMetrics]) {
_shuffleReadMetrics = shuffleReadMetrics
}

/**
* ShuffleReadMetrics per dependency for collecting independently while task is in progress.
*/
@transient private lazy val depsShuffleReadMetrics: ArrayBuffer[ShuffleReadMetrics] =
new ArrayBuffer[ShuffleReadMetrics]()

/**
* If this task writes to shuffle output, metrics on the written shuffle data will be collected
* here
Expand All @@ -98,19 +115,31 @@ class TaskMetrics extends Serializable {
*/
var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None

/** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */
def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized {
_shuffleReadMetrics match {
case Some(existingMetrics) =>
existingMetrics.shuffleFinishTime = math.max(
existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
case None =>
_shuffleReadMetrics = Some(newMetrics)
/**
* A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
* issues from readers in different threads, in-progress tasks use a ShuffleReadMetrics for each
* dependency, and merge these metrics before reporting them to the driver. This method returns
* a ShuffleReadMetrics for a dependency and registers it for merging later.
*/
private [spark] def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = synchronized {
val readMetrics = new ShuffleReadMetrics()
depsShuffleReadMetrics += readMetrics
readMetrics
}

/**
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
private[spark] def updateShuffleReadMetrics() = synchronized {
val merged = new ShuffleReadMetrics()
for (depMetrics <- depsShuffleReadMetrics) {
merged.fetchWaitTime += depMetrics.fetchWaitTime
merged.localBlocksFetched += depMetrics.localBlocksFetched
merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched
merged.remoteBytesRead += depMetrics.remoteBytesRead
merged.shuffleFinishTime = math.max(merged.shuffleFinishTime, depMetrics.shuffleFinishTime)
}
_shuffleReadMetrics = Some(merged)
}
}

Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,11 @@ abstract class RDD[T: ClassTag](
dependencies.head.rdd.asInstanceOf[RDD[U]]
}

/** Returns the jth parent RDD: e.g. rdd.parent[T](0) is equivalent to rdd.firstParent[T] */
protected[spark] def parent[U: ClassTag](j: Int) = {
dependencies(j).rdd.asInstanceOf[RDD[U]]
}

/** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
def context = sc

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
shuffleId: Int,
reduceId: Int,
context: TaskContext,
serializer: Serializer)
serializer: Serializer,
shuffleMetrics: ShuffleReadMetrics)
: Iterator[T] =
{
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
Expand Down Expand Up @@ -73,17 +74,11 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
}
}

val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer, shuffleMetrics)
val itr = blockFetcherItr.flatMap(unpackBlock)

val completionIter = CompletionIterator[T, Iterator[T]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics)
context.taskMetrics.updateShuffleReadMetrics()
})

new InterruptibleIterator[T](context, completionIter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ private[spark] class HashShuffleReader[K, C](

/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
val ser = Serializer.getSerializer(dep.serializer)
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser,
readMetrics)

val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
Expand All @@ -58,7 +60,7 @@ private[spark] class HashShuffleReader[K, C](
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.write(aggregatedIter)
sorter.insertAll(aggregatedIter)
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
sorter.iterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private[spark] class SortShuffleWriter[K, V, C](

private var sorter: ExternalSorter[K, V, _] = null
private var outputFile: File = null
private var indexFile: File = null

// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
Expand All @@ -57,78 +58,36 @@ private[spark] class SortShuffleWriter[K, V, C](

/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
// Get an iterator with the elements for each partition ID
val partitions: Iterator[(Int, Iterator[Product2[K, _]])] = {
if (dep.mapSideCombine) {
if (!dep.aggregator.isDefined) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
}
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.write(records)
sorter.partitionedIterator
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we
// don't care whether the keys get sorted in each partition; that will be done on the
// reduce side if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](
None, Some(dep.partitioner), None, dep.serializer)
sorter.write(records)
sorter.partitionedIterator
if (dep.mapSideCombine) {
if (!dep.aggregator.isDefined) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
}
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](
None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
}

// Create a single shuffle file with reduce ID 0 that we'll write all results to. We'll later
// serve different ranges of this file using an index file that we create at the end.
val blockId = ShuffleBlockId(dep.shuffleId, mapId, 0)
outputFile = blockManager.diskBlockManager.getFile(blockId)

// Track location of each range in the output file
val offsets = new Array[Long](numPartitions + 1)
val lengths = new Array[Long](numPartitions)

for ((id, elements) <- partitions) {
if (elements.hasNext) {
val writer = blockManager.getDiskWriter(blockId, outputFile, ser, fileBufferSize,
writeMetrics)
for (elem <- elements) {
writer.write(elem)
}
writer.commitAndClose()
val segment = writer.fileSegment()
offsets(id + 1) = segment.offset + segment.length
lengths(id) = segment.length
} else {
// The partition is empty; don't create a new writer to avoid writing headers, etc
offsets(id + 1) = offsets(id)
}
}

context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled

// Write an index file with the offsets of each block, plus a final offset at the end for the
// end of the output file. This will be used by SortShuffleManager.getBlockLocation to figure
// out where each block begins and ends.
outputFile = blockManager.diskBlockManager.getFile(blockId)
indexFile = blockManager.diskBlockManager.getFile(blockId.name + ".index")

val diskBlockManager = blockManager.diskBlockManager
val indexFile = diskBlockManager.getFile(blockId.name + ".index")
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
try {
var i = 0
while (i < numPartitions + 1) {
out.writeLong(offsets(i))
i += 1
}
} finally {
out.close()
}
val partitionLengths = sorter.writePartitionedFile(blockId, context)

// Register our map output with the ShuffleBlockManager, which handles cleaning it over time
blockManager.shuffleBlockManager.addCompletedMap(dep.shuffleId, mapId, numPartitions)

mapStatus = new MapStatus(blockManager.blockManagerId,
lengths.map(MapOutputTracker.compressSize))
partitionLengths.map(MapOutputTracker.compressSize))
}

/** Close this writer, passing along whether the map completed */
Expand All @@ -145,6 +104,9 @@ private[spark] class SortShuffleWriter[K, V, C](
if (outputFile != null) {
outputFile.delete()
}
if (indexFile != null) {
indexFile.delete()
}
return None
}
} finally {
Expand Down
Loading

0 comments on commit f61e9d2

Please sign in to comment.