Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into sql-udt2
Browse files Browse the repository at this point in the history
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
  • Loading branch information
jkbradley committed Nov 2, 2014
2 parents f3c72fe + c9f8400 commit 15c10a6
Show file tree
Hide file tree
Showing 64 changed files with 1,471 additions and 225 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ case object TaskKilled extends TaskFailedReason {
* the task crashed the JVM.
*/
@DeveloperApi
case object ExecutorLostFailure extends TaskFailedReason {
override def toErrorString: String = "ExecutorLostFailure (executor lost)"
case class ExecutorLostFailure(execId: String) extends TaskFailedReason {
override def toErrorString: String = s"ExecutorLostFailure (executor ${execId} lost)"
}

/**
Expand Down
25 changes: 15 additions & 10 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ private[spark] class Executor(
// to send the result back.
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)

// Start worker thread pool
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")

Expand Down Expand Up @@ -210,25 +213,27 @@ private[spark] class Executor(
val resultSize = serializedDirectResult.limit

// directSend = sending directly back to the driver
val (serializedResult, directSend) = {
if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val serializedResult = {
if (resultSize > maxResultSize) {
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
(ser.serialize(new IndirectTaskResult[Any](blockId)), false)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
(serializedDirectResult, true)
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
serializedDirectResult
}
}

execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

if (directSend) {
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
} else {
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
}
} catch {
case ffe: FetchFailedException => {
val reason = ffe.toTaskEndReason
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.util.Utils
private[spark] sealed trait TaskResult[T]

/** A reference to a DirectTaskResult that has been stored in the worker's BlockManager. */
private[spark]
case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Serializable
private[spark] case class IndirectTaskResult[T](blockId: BlockId, size: Int)
extends TaskResult[T] with Serializable

/** A TaskResult that contains the task's return value and accumulator updates. */
private[spark]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,18 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
val result = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] => directResult
case IndirectTaskResult(blockId) =>
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
(directResult, serializedData.limit())
case IndirectTaskResult(blockId, size) =>
if (!taskSetManager.canFetchMoreResults(size)) {
// dropped by executor if size is larger than maxResultSize
sparkEnv.blockManager.master.removeBlock(blockId)
return
}
logDebug("Fetching indirect task result for TID %s".format(tid))
scheduler.handleTaskGettingResult(taskSetManager, tid)
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
Expand All @@ -64,9 +73,10 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get)
sparkEnv.blockManager.master.removeBlock(blockId)
deserializedResult
(deserializedResult, size)
}
result.metrics.resultSize = serializedData.limit()

result.metrics.resultSize = size
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
case cnf: ClassNotFoundException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import java.util.Arrays
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.math.max
import scala.math.min
import scala.math.{min, max}

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.{Clock, SystemClock}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
Expand Down Expand Up @@ -68,6 +67,9 @@ private[spark] class TaskSetManager(
val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)

// Limit of bytes for total size of results (default is 1GB)
val maxResultSize = Utils.getMaxResultSize(conf)

// Serializer for closures and tasks.
val env = SparkEnv.get
val ser = env.closureSerializer.newInstance()
Expand All @@ -89,6 +91,8 @@ private[spark] class TaskSetManager(
var stageId = taskSet.stageId
var name = "TaskSet_" + taskSet.stageId.toString
var parent: Pool = null
var totalResultSize = 0L
var calculatedTasks = 0

val runningTasksSet = new HashSet[Long]
override def runningTasks = runningTasksSet.size
Expand Down Expand Up @@ -515,12 +519,33 @@ private[spark] class TaskSetManager(
index
}

/**
* Marks the task as getting result and notifies the DAG Scheduler
*/
def handleTaskGettingResult(tid: Long) = {
val info = taskInfos(tid)
info.markGettingResult()
sched.dagScheduler.taskGettingResult(info)
}

/**
* Check whether has enough quota to fetch the result with `size` bytes
*/
def canFetchMoreResults(size: Long): Boolean = synchronized {
totalResultSize += size
calculatedTasks += 1
if (maxResultSize > 0 && totalResultSize > maxResultSize) {
val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
s"(${Utils.bytesToString(totalResultSize)}) is bigger than maxResultSize " +
s"(${Utils.bytesToString(maxResultSize)})"
logError(msg)
abort(msg)
false
} else {
true
}
}

/**
* Marks the task as successful and notifies the DAGScheduler that a task has ended.
*/
Expand Down Expand Up @@ -707,7 +732,7 @@ private[spark] class TaskSetManager(
}
// Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure)
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(execId))
}
// recalculate valid locality levels and waits when executor is lost
recomputeLocality()
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ private[spark] object JsonProtocol {

def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
val reason = Utils.getFormattedClassName(taskEndReason)
val json = taskEndReason match {
val json: JObject = taskEndReason match {
case fetchFailed: FetchFailed =>
val blockManagerAddress = Option(fetchFailed.bmAddress).
map(blockManagerIdToJson).getOrElse(JNothing)
Expand All @@ -287,6 +287,8 @@ private[spark] object JsonProtocol {
("Description" -> exceptionFailure.description) ~
("Stack Trace" -> stackTrace) ~
("Metrics" -> metrics)
case ExecutorLostFailure(executorId) =>
("Executor ID" -> executorId)
case _ => Utils.emptyJson
}
("Reason" -> reason) ~ json
Expand Down Expand Up @@ -636,7 +638,9 @@ private[spark] object JsonProtocol {
new ExceptionFailure(className, description, stackTrace, metrics)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `executorLostFailure` => ExecutorLostFailure
case `executorLostFailure` =>
val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
ExecutorLostFailure(executorId.getOrElse("Unknown"))
case `unknownReason` => UnknownReason
}
}
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package org.apache.spark.util
import java.io._
import java.net._
import java.nio.ByteBuffer
import java.util.jar.Attributes.Name
import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
import java.util.jar.{Manifest => JarManifest}

import scala.collection.JavaConversions._
import scala.collection.Map
Expand Down Expand Up @@ -1720,6 +1722,11 @@ private[spark] object Utils extends Logging {
method.invoke(obj, values.toSeq: _*)
}

// Limit of bytes for total size of results (default is 1GB)
def getMaxResultSize(conf: SparkConf): Long = {
memoryStringToMb(conf.get("spark.driver.maxResultSize", "1g")).toLong << 20
}

/**
* Return the current system LD_LIBRARY_PATH name
*/
Expand Down Expand Up @@ -1754,6 +1761,12 @@ private[spark] object Utils extends Logging {
s"$libraryPathEnvName=$libraryPath$ampersand"
}

lazy val sparkVersion =
SparkContext.jarOfObject(this).map { path =>
val manifestUrl = new URL(s"jar:file:$path!/META-INF/MANIFEST.MF")
val manifest = new JarManifest(manifestUrl.openStream())
manifest.getMainAttributes.getValue(Name.IMPLEMENTATION_VERSION)
}.getOrElse("Unknown")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
// Only remove the result once, since we'd like to test the case where the task eventually
// succeeds.
serializer.get().deserialize[TaskResult[_]](serializedData) match {
case IndirectTaskResult(blockId) =>
case IndirectTaskResult(blockId, size) =>
sparkEnv.blockManager.master.removeBlock(blockId)
case directResult: DirectTaskResult[_] =>
taskSetManager.abort("Internal error: expect only indirect results")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,31 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.emittedTaskSizeWarning)
}

test("abort the job if total size of results is too large") {
val conf = new SparkConf().set("spark.driver.maxResultSize", "2m")
sc = new SparkContext("local", "test", conf)

def genBytes(size: Int) = { (x: Int) =>
val bytes = Array.ofDim[Byte](size)
scala.util.Random.nextBytes(bytes)
bytes
}

// multiple 1k result
val r = sc.makeRDD(0 until 10, 10).map(genBytes(1024)).collect()
assert(10 === r.size )

// single 10M result
val thrown = intercept[SparkException] {sc.makeRDD(genBytes(10 << 20)(0), 1).collect()}
assert(thrown.getMessage().contains("bigger than maxResultSize"))

// multiple 1M results
val thrown2 = intercept[SparkException] {
sc.makeRDD(0 until 10, 10).map(genBytes(1 << 20)).collect()
}
assert(thrown2.getMessage().contains("bigger than maxResultSize"))
}

test("speculative and noPref task should be scheduled after node-local") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
new ExceptionFailure("Exception", "description", null, None),
TaskResultLost,
TaskKilled,
ExecutorLostFailure,
ExecutorLostFailure("0"),
UnknownReason)
var failCount = 0
for (reason <- taskFailedReasons) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class JsonProtocolSuite extends FunSuite {
testTaskEndReason(exceptionFailure)
testTaskEndReason(TaskResultLost)
testTaskEndReason(TaskKilled)
testTaskEndReason(ExecutorLostFailure)
testTaskEndReason(ExecutorLostFailure("100"))
testTaskEndReason(UnknownReason)

// BlockId
Expand Down Expand Up @@ -403,7 +403,8 @@ class JsonProtocolSuite extends FunSuite {
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
case (ExecutorLostFailure, ExecutorLostFailure) =>
case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) =>
assert(execId1 === execId2)
case (UnknownReason, UnknownReason) =>
case _ => fail("Task end reasons don't match in types!")
}
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ of the most common options to set are:
(e.g. <code>512m</code>, <code>2g</code>).
</td>
</tr>
<tr>
<td><code>spark.driver.maxResultSize</code></td>
<td>1g</td>
<td>
Limit of total size of serialized results of all partitions for each Spark action (e.g. collect).
Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size
is above this limit.
Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory
and memory overhead of objects in JVM). Setting a proper limit can protect the driver from
out-of-memory errors.
</td>
</tr>
<tr>
<td><code>spark.serializer</code></td>
<td>org.apache.spark.serializer.<br />JavaSerializer</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ case class AttributeReference(
val qualifiers: Seq[String] = Nil) extends Attribute with trees.LeafNode[Expression] {

override def equals(other: Any) = other match {
case ar: AttributeReference => exprId == ar.exprId && dataType == ar.dataType
case ar: AttributeReference => name == ar.name && exprId == ar.exprId && dataType == ar.dataType
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.sql

/**
* Catalyst is a library for manipulating relational query plans. All classes in catalyst are
* considered an internal API to Spark SQL and are subject to change between minor releases.
*/
package object catalyst {
/**
* A JVM-global lock that should be used to prevent thread safety issues when using things in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNode

/**
* Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can
* be used for execution. If this strategy does not apply to the give logical operation then an
* empty list should be returned.
*/
abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging {
def apply(plan: LogicalPlan): Seq[PhysicalPlan]
}

/**
* Abstract class for transforming [[plans.logical.LogicalPlan LogicalPlan]]s into physical plans.
* Child classes are responsible for specifying a list of [[Strategy]] objects that each of which
Expand All @@ -35,16 +44,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
*/
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
/** A list of execution strategies that can be used by the planner */
def strategies: Seq[Strategy]

/**
* Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can
* be used for execution. If this strategy does not apply to the give logical operation then an
* empty list should be returned.
*/
abstract protected class Strategy extends Logging {
def apply(plan: LogicalPlan): Seq[PhysicalPlan]
}
def strategies: Seq[GenericStrategy[PhysicalPlan]]

/**
* Returns a placeholder for a physical plan that executes `plan`. This placeholder will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty) {

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
Expand Down
Loading

0 comments on commit 15c10a6

Please sign in to comment.