Skip to content

Commit

Permalink
SPARK-2099. Report progress while task is running.
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Aug 1, 2014
1 parent 284771e commit 0dae734
Show file tree
Hide file tree
Showing 23 changed files with 413 additions and 164 deletions.
39 changes: 39 additions & 0 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import akka.actor.Actor
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler

case class Heartbeat(
executorId: String,
taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId)
extends Serializable

/**
* Lives in the driver to receive heartbeats from executors..
*/
class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor {
override def receive = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
sender ! scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)
}
}
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
import akka.actor.Props

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
Expand Down Expand Up @@ -307,6 +308,8 @@ class SparkContext(config: SparkConf) extends Logging {

// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
Expand Down Expand Up @@ -992,6 +995,9 @@ class SparkContext(config: SparkConf) extends Logging {
if (dagSchedulerCopy != null) {
env.metricsSystem.report()
metadataCleaner.cancel()
if (heartbeatReceiver != null) {
env.actorSystem.stop(heartbeatReceiver)
}
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
taskScheduler = null
Expand Down
8 changes: 1 addition & 7 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,7 @@ object SparkEnv extends Logging {
logInfo("Registering " + name)
actorSystem.actorOf(Props(newActor), name = name)
} else {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
AkkaUtils.makeDriverRef(name, conf, actorSystem)
}
}

Expand Down
52 changes: 49 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.ByteBuffer
import java.util.concurrent._

import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark._
import org.apache.spark.scheduler._
Expand All @@ -48,6 +48,8 @@ private[spark] class Executor(

private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))

private var isStopped = false

// No ip or host:port - just hostname
Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname")
// must not have port specified.
Expand Down Expand Up @@ -107,6 +109,8 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

startDriverHeartbeater()

def launchTask(
context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, taskName, serializedTask)
Expand Down Expand Up @@ -141,11 +145,11 @@ private[spark] class Executor(
}

class TaskRunner(
execBackend: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer)
execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)
extends Runnable {

@volatile private var killed = false
@volatile private var task: Task[Any] = _
@volatile var task: Task[Any] = _

def kill(interruptThread: Boolean) {
logInfo(s"Executor is trying to kill $taskName (TID $taskId)")
Expand Down Expand Up @@ -354,4 +358,46 @@ private[spark] class Executor(
}
}
}

def stop() {
isStopped = true
threadPool.shutdown()
}

def startDriverHeartbeater() {
val interval = conf.getInt("spark.executor.heartbeatInterval", 2000)
val timeout = AkkaUtils.lookupTimeout(conf)
val retryAttempts = AkkaUtils.numRetries(conf)
val retryIntervalMs = AkkaUtils.retryWaitMs(conf)
val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf, env.actorSystem)

val t = new Thread() {
override def run() {
// Sleep a random interval so the heartbeats don't end up in sync
Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])

while (!isStopped) {
val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
for (taskRunner <- runningTasks.values()) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
tasksMetrics += ((taskRunner.taskId, metrics))
}
}

val message = Heartbeat(executorId, tasksMetrics.toArray,
env.blockManager.blockManagerId)
val reregister = !AkkaUtils.askWithReply[Boolean](message, heartbeatReceiverRef,
retryAttempts, retryIntervalMs, timeout)
if (reregister) {
logWarning("Told to re-register on heartbeat")
env.blockManager.reregister()
}
Thread.sleep(interval)
}
}
}
t.setDaemon(true)
t.setName("Driver Heartbeater")
t.start()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import scala.reflect.ClassTag
import scala.util.control.NonFatal

import akka.actor._
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Stop
import akka.pattern.ask
import akka.util.Timeout
Expand All @@ -39,8 +38,10 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils}
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat

/**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
Expand Down Expand Up @@ -154,6 +155,21 @@ class DAGScheduler(
eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
}

/**
* Update metrics for in-progress tasks and let the master know that the BlockManager is still
* alive. Return true if the driver knows about the given block manager. Otherwise, return false,
* indicating that the block manager should re-register.
*/
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, Int, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
implicit val timeout = Timeout(30 seconds)

Await.result(
blockManagerMaster.driverActor ? BlockManagerHeartbeat(blockManagerId),
timeout.duration).asInstanceOf[Boolean]
}

// Called by TaskScheduler when an executor fails.
def executorLost(execId: String) {
eventProcessActor ! ExecutorLost(execId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent

case class SparkListenerExecutorMetricsUpdate(execId: String,
taskMetrics: Seq[(Long, Int, TaskMetrics)]) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
extends SparkListenerEvent
Expand Down Expand Up @@ -158,6 +161,11 @@ trait SparkListener {
* Called when the application ends
*/
def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { }

/**
* Called when the driver receives task metrics from an executor in a heartbeat.
*/
def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ private[spark] trait SparkListenerBus extends Logging {
foreachListener(_.onApplicationStart(applicationStart))
case applicationEnd: SparkListenerApplicationEnd =>
foreachListener(_.onApplicationEnd(applicationEnd))
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
case SparkListenerShutdown =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.scheduler

import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId

/**
* Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl.
Expand Down Expand Up @@ -54,4 +56,8 @@ private[spark] trait TaskScheduler {

// Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
def defaultParallelism(): Int

// Returns false if the executor should reregister
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import akka.actor.Props

/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
Expand Down Expand Up @@ -320,6 +323,24 @@ private[spark] class TaskSchedulerImpl(
}
}

/**
* Update metrics for in-progress tasks and let the master know that the BlockManager is still
* alive. Return true if the driver knows about the given block manager. Otherwise, return false,
* indicating that the block manager should re-register.
*/
override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = {
val metricsWithStageIds = taskMetrics.flatMap{
case (id, metrics) => {
taskIdToTaskSetId.get(id)
.flatMap(activeTaskSets.get)
.map(_.stageId)
.map(x => (id, x, metrics))
}
}
dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
}

def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
taskSetManager.handleTaskGettingResult(tid)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.nio.ByteBuffer

import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.{SerializableBuffer, Utils}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId

private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ import akka.actor.{Actor, ActorRef, Props}

import org.apache.spark.{Logging, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.executor.{TaskMetrics, Executor, ExecutorBackend}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.storage.BlockManagerId

private case class ReviveOffers()

private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)

private case class KillTask(taskId: Long, interruptThread: Boolean)

private case class StopExecutor()

/**
* Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on
* LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend
Expand Down Expand Up @@ -63,6 +66,9 @@ private[spark] class LocalActor(

case KillTask(taskId, interruptThread) =>
executor.killTask(taskId, interruptThread)

case StopExecutor =>
executor.stop()
}

def reviveOffers() {
Expand Down Expand Up @@ -91,6 +97,7 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
}

override def stop() {
localActor ! StopExecutor
}

override def reviveOffers() {
Expand Down
Loading

0 comments on commit 0dae734

Please sign in to comment.