Skip to content

Commit

Permalink
Stylistic fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Aug 1, 2014
1 parent 0dae734 commit 3bda974
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 16 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler

case class Heartbeat(
private[spark] case class Heartbeat(
executorId: String,
taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId)
Expand All @@ -31,7 +31,7 @@ case class Heartbeat(
/**
* Lives in the driver to receive heartbeats from executors..
*/
class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor {
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor {
override def receive = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
sender ! scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ class DAGScheduler(
* alive. Return true if the driver knows about the given block manager. Otherwise, return false,
* indicating that the block manager should re-register.
*/
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, Int, TaskMetrics)],
def executorHeartbeatReceived(
execId: String,
taskMetrics: Array[(Long, Int, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
implicit val timeout = Timeout(30 seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent

case class SparkListenerExecutorMetricsUpdate(execId: String,
taskMetrics: Seq[(Long, Int, TaskMetrics)]) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerExecutorMetricsUpdate(
execId: String,
taskMetrics: Seq[(Long, Int, TaskMetrics)])
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,11 @@ private[spark] class TaskSchedulerImpl(
* alive. Return true if the driver knows about the given block manager. Otherwise, return false,
* indicating that the block manager should re-register.
*/
override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
override def executorHeartbeatReceived(
execId: String,
taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = {
val metricsWithStageIds = taskMetrics.flatMap{
val metricsWithStageIds = taskMetrics.flatMap {
case (id, metrics) => {
taskIdToTaskSetId.get(id)
.flatMap(activeTaskSets.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import java.nio.ByteBuffer

import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.{SerializableBuffer, Utils}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId

private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
new StageUIData
})
stageData.numActiveTasks += 1
stageData.taskData.put(taskInfo.taskId, new UIData.TaskUIData(taskInfo))
stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo))
}
}

Expand Down Expand Up @@ -160,14 +160,17 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
updateAggregateMetrics(stageData, info.executorId, metrics.get, oldMetrics)
}

val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new UIData.TaskUIData(info))
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info))
taskData.taskInfo = info
taskData.taskMetrics = metrics
taskData.errorMessage = errorMessage
}
}

def updateAggregateMetrics(stageData: StageUIData, execId: String, taskMetrics: TaskMetrics,
def updateAggregateMetrics(
stageData: StageUIData,
execId: String,
taskMetrics: TaskMetrics,
oldMetrics: Option[TaskMetrics]) {
val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary)

Expand Down Expand Up @@ -205,7 +208,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
new StageUIData
})
val taskData = stageData.taskData.get(taskId)
taskData.map{ t =>
taskData.map { t =>
if (!t.taskInfo.finished) {
updateAggregateMetrics(stageData, executorMetricsUpdate.execId, taskMetrics,
t.taskMetrics)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[jobs] object UIData {
var executorSummary = new HashMap[String, ExecutorSummary]
}

case class TaskUIData(
class TaskUIData(
var taskInfo: TaskInfo,
var taskMetrics: Option[TaskMetrics] = None,
var errorMessage: Option[String] = None)
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,12 @@ private[spark] object AkkaUtils extends Logging {
* Send a message to the given actor and get its result within a default timeout, or
* throw a SparkException if this fails.
*/
def askWithReply[T](message: Any, actor: ActorRef, retryAttempts: Int,
retryInterval: Int, timeout: FiniteDuration): T = {
def askWithReply[T](
message: Any,
actor: ActorRef,
retryAttempts: Int,
retryInterval: Int,
timeout: FiniteDuration): T = {
// TODO: Consider removing multiple attempts
if (actor == null) {
throw new SparkException("Error sending message as driverActor is null " +
Expand Down

0 comments on commit 3bda974

Please sign in to comment.