Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-4705
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Apr 23, 2015
2 parents c14ec19 + cc48e63 commit f1cb9b3
Show file tree
Hide file tree
Showing 191 changed files with 3,055 additions and 1,246 deletions.
22 changes: 13 additions & 9 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
## Contributing to Spark

Contributions via GitHub pull requests are gladly accepted from their original
author. Along with any pull requests, please state that the contribution is
your original work and that you license the work to the project under the
project's open source license. Whether or not you state this explicitly, by
submitting any copyrighted material via pull request, email, or other means
you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.
*Before opening a pull request*, review the
[Contributing to Spark wiki](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark).
It lists steps that are required before creating a PR. In particular, consider:

- Is the change important and ready enough to ask the community to spend time reviewing?
- Have you searched for existing, related JIRAs and pull requests?
- Is this a new feature that can stand alone as a package on http://spark-packages.org ?
- Is the change being proposed clearly explained and motivated?

Please see the [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
for more information.
When you contribute code, you affirm that the contribution is your original work and that you
license the work to the project under the project's open source license. Whether or not you
state this explicitly, by submitting any copyrighted material via pull request, email, or
other means you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.incBytesRead(inputMetrics.bytesRead)
.getInputMetricsForReadMethod(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)

val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark

import java.util.concurrent.{Executors, TimeUnit}
import java.util.concurrent.TimeUnit

import scala.collection.mutable

import org.apache.spark.scheduler._
import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}

/**
* An agent that dynamically allocates and removes executors based on the workload.
Expand Down Expand Up @@ -132,8 +132,8 @@ private[spark] class ExecutorAllocationManager(
private val listener = new ExecutorAllocationListener

// Executor that handles the scheduling task.
private val executor = Executors.newSingleThreadScheduledExecutor(
Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")

/**
* Verify that the settings specified through the config are valid.
Expand Down
11 changes: 5 additions & 6 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.apache.spark

import java.util.concurrent.{ScheduledFuture, TimeUnit, Executors}
import java.util.concurrent.{ScheduledFuture, TimeUnit}

import scala.collection.mutable

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
Expand Down Expand Up @@ -76,11 +76,10 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)

private var timeoutCheckingTask: ScheduledFuture[_] = null

private val timeoutCheckingThread = Executors.newSingleThreadScheduledExecutor(
Utils.namedThreadFactory("heartbeat-timeout-checking-thread"))
private val timeoutCheckingThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")

private val killExecutorThread = Executors.newSingleThreadExecutor(
Utils.namedThreadFactory("kill-executor-thread"))
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")

override def onStart(): Unit = {
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,15 @@ private[spark] object SparkConf extends Logging {
"spark.yarn.am.waitTime" -> Seq(
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
// Translate old value to a duration, with 10s wait time per try.
translation = s => s"${s.toLong * 10}s"))
translation = s => s"${s.toLong * 10}s")),
"spark.rpc.numRetries" -> Seq(
AlternateConfig("spark.akka.num.retries", "1.4")),
"spark.rpc.retry.wait" -> Seq(
AlternateConfig("spark.akka.retry.wait", "1.4")),
"spark.rpc.askTimeout" -> Seq(
AlternateConfig("spark.akka.askTimeout", "1.4")),
"spark.rpc.lookupTimeout" -> Seq(
AlternateConfig("spark.akka.lookupTimeout", "1.4"))
)

/**
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils}

/**
* Proxy that relays messages to the driver.
Expand All @@ -36,7 +36,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
extends Actor with ActorLogReceive with Logging {

var masterActor: ActorSelection = _
val timeout = AkkaUtils.askTimeout(conf)
val timeout = RpcUtils.askTimeout(conf)

override def preStart(): Unit = {
masterActor = context.actorSelection(
Expand Down Expand Up @@ -155,7 +155,7 @@ object Client {
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
conf.set("spark.akka.logLifecycleEvents", "true")
}
conf.set("spark.akka.askTimeout", "10")
conf.set("spark.rpc.askTimeout", "10")
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils}
import org.apache.spark.util.{ActorLogReceive, RpcUtils, Utils, AkkaUtils}

/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
Expand Down Expand Up @@ -193,7 +193,7 @@ private[spark] class AppClient(
def stop() {
if (actor != null) {
try {
val timeout = AkkaUtils.askTimeout(conf)
val timeout = RpcUtils.askTimeout(conf)
val future = actor.ask(StopAppClient)(timeout)
Await.result(future, timeout)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}


Expand Down Expand Up @@ -99,7 +99,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private val replayExecutor: ExecutorService = {
if (!conf.contains("spark.testing")) {
Executors.newSingleThreadExecutor(Utils.namedThreadFactory("log-replay-executor"))
ThreadUtils.newDaemonSingleThreadExecutor("log-replay-executor")
} else {
MoreExecutors.sameThreadExecutor()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.SignalLogger
import org.apache.spark.util.{SignalLogger, Utils}

/**
* A web server that renders SparkUIs of completed applications.
Expand Down Expand Up @@ -203,9 +203,7 @@ object HistoryServer extends Logging {
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()

Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
override def run(): Unit = server.stop()
})
Utils.addShutdownHook { () => server.stop() }

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, SignalLogger, Utils}

private[master] class Master(
host: String,
Expand Down Expand Up @@ -931,7 +931,7 @@ private[deploy] object Master extends Logging {
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
val timeout = AkkaUtils.askTimeout(conf)
val timeout = RpcUtils.askTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.AkkaUtils
import org.apache.spark.util.RpcUtils

/**
* Web UI server for the standalone master.
Expand All @@ -31,7 +31,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {

val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
val timeout = RpcUtils.askTimeout(master.conf)
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)

initialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.json4s._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.deploy.ClientArguments._

Expand Down Expand Up @@ -223,7 +223,7 @@ private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
}

protected def handleKill(submissionId: String): KillSubmissionResponse = {
val askTimeout = AkkaUtils.askTimeout(conf)
val askTimeout = RpcUtils.askTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout)
val k = new KillSubmissionResponse
Expand Down Expand Up @@ -257,7 +257,7 @@ private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
}

protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
val askTimeout = AkkaUtils.askTimeout(conf)
val askTimeout = RpcUtils.askTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout)
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
Expand Down Expand Up @@ -321,7 +321,7 @@ private[rest] class SubmitRequestServlet(
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
val askTimeout = AkkaUtils.askTimeout(conf)
val askTimeout = RpcUtils.askTimeout(conf)
val driverDescription = buildDriverDescription(submitRequest)
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.common.io.Files
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.util.Utils
import org.apache.spark.util.logging.FileAppender

/**
Expand Down Expand Up @@ -61,20 +62,15 @@ private[deploy] class ExecutorRunner(

// NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
// make sense to remove this in the future.
private var shutdownHook: Thread = null
private var shutdownHook: AnyRef = null

private[worker] def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker shutting down")) }
}

/**
Expand Down Expand Up @@ -106,7 +102,7 @@ private[deploy] class ExecutorRunner(
workerThread = null
state = ExecutorState.KILLED
try {
Runtime.getRuntime.removeShutdownHook(shutdownHook)
Utils.removeShutdownHook(shutdownHook)
} catch {
case e: IllegalStateException => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.worker.ui.WorkerWebUI._
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.AkkaUtils
import org.apache.spark.util.RpcUtils

/**
* Web UI server for the standalone worker.
Expand All @@ -38,7 +38,7 @@ class WorkerWebUI(
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
with Logging {

private[ui] val timeout = AkkaUtils.askTimeout(worker.conf)
private[ui] val timeout = RpcUtils.askTimeout(worker.conf)

initialize()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{SignalLogger, Utils}

private[spark] class CoarseGrainedExecutorBackend(
Expand All @@ -47,6 +48,10 @@ private[spark] class CoarseGrainedExecutorBackend(
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None

// If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
// to be changed so that we don't share the serializer instance across threads
private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()

override def onStart() {
import scala.concurrent.ExecutionContext.Implicits.global
logInfo("Connecting to driver: " + driverUrl)
Expand Down Expand Up @@ -83,7 +88,6 @@ private[spark] class CoarseGrainedExecutorBackend(
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val ser = env.closureSerializer.newInstance()
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import java.lang.management.ManagementFactory
import java.net.URL
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
Expand Down Expand Up @@ -76,7 +76,7 @@ private[spark] class Executor(
}

// Start worker thread pool
private val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
private val executorSource = new ExecutorSource(threadPool, executorId)

if (!isLocal) {
Expand Down Expand Up @@ -110,8 +110,7 @@ private[spark] class Executor(
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

// Executor for the heartbeat task.
private val heartbeater = Executors.newSingleThreadScheduledExecutor(
Utils.namedThreadFactory("driver-heartbeater"))
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")

startDriverHeartbeater()

Expand Down
Loading

0 comments on commit f1cb9b3

Please sign in to comment.