Skip to content

Commit

Permalink
Renamed ExecutorDetails back to ExecutorInfo and other CR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Kostas Sakellis committed Jan 8, 2015
1 parent 14fe78d commit 1727b38
Show file tree
Hide file tree
Showing 12 changed files with 46 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* new events get added to both the SparkListener and this adapter
* in lockstep.
*/
public class SparkListenerAdapter implements SparkListener {
public class JavaSparkListener implements SparkListener {

@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ private[spark] class ApplicationInfo(
extends Serializable {

@transient var state: ApplicationState.Value = _
@transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
@transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _
@transient var executors: mutable.HashMap[Int, ExecutorDesc] = _
@transient var removedExecutors: ArrayBuffer[ExecutorDesc] = _
@transient var coresGranted: Int = _
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
Expand All @@ -55,12 +55,12 @@ private[spark] class ApplicationInfo(

private def init() {
state = ApplicationState.WAITING
executors = new mutable.HashMap[Int, ExecutorInfo]
executors = new mutable.HashMap[Int, ExecutorDesc]
coresGranted = 0
endTime = -1L
appSource = new ApplicationSource(this)
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorInfo]
removedExecutors = new ArrayBuffer[ExecutorDesc]
}

private def newExecutorId(useID: Option[Int] = None): Int = {
Expand All @@ -75,14 +75,14 @@ private[spark] class ApplicationInfo(
}
}

def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = {
val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorDesc = {
val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
executors(exec.id) = exec
coresGranted += cores
exec
}

def removeExecutor(exec: ExecutorInfo) {
def removeExecutor(exec: ExecutorDesc) {
if (executors.contains(exec.id)) {
removedExecutors += executors(exec.id)
executors -= exec.id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy.master

import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}

private[spark] class ExecutorInfo(
private[spark] class ExecutorDesc(
val id: Int,
val application: ApplicationInfo,
val worker: WorkerInfo,
Expand All @@ -37,7 +37,7 @@ private[spark] class ExecutorInfo(

override def equals(other: Any): Boolean = {
other match {
case info: ExecutorInfo =>
case info: ExecutorDesc =>
fullId == info.fullId &&
worker.id == info.worker.id &&
cores == info.cores &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ private[spark] class Master(
}
}

def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[spark] class WorkerInfo(
Utils.checkHost(host, "Expected hostname")
assert (port > 0)

@transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info
@transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // executorId => info
@transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info
@transient var state: WorkerState.Value = _
@transient var coresUsed: Int = _
Expand Down Expand Up @@ -70,13 +70,13 @@ private[spark] class WorkerInfo(
host + ":" + port
}

def addExecutor(exec: ExecutorInfo) {
def addExecutor(exec: ExecutorDesc) {
executors(exec.fullId) = exec
coresUsed += exec.cores
memoryUsed += exec.memory
}

def removeExecutor(exec: ExecutorInfo) {
def removeExecutor(exec: ExecutorDesc) {
if (executors.contains(exec.fullId)) {
executors -= exec.fullId
coresUsed -= exec.cores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.json4s.JValue

import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.deploy.master.ExecutorDesc
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -109,7 +109,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
}

private def executorRow(executor: ExecutorInfo): Seq[Node] = {
private def executorRow(executor: ExecutorDesc): Seq[Node] = {
<tr>
<td>{executor.id}</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable
import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.ExecutorDetails
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{Distribution, Utils}

Expand Down Expand Up @@ -86,11 +86,11 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerExecutorAdded(executorId: String, executorDetails: ExecutorDetails)
case class SparkListenerExecutorAdded(executorId: String, executorInfo: ExecutorInfo)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerExecutorRemoved(executorId: String, executorDetails: ExecutorDetails)
case class SparkListenerExecutorRemoved(executorId: String, executorInfo: ExecutorInfo)
extends SparkListenerEvent

/**
Expand Down Expand Up @@ -119,7 +119,7 @@ private[spark] case object SparkListenerShutdown extends SparkListenerEvent
* :: DeveloperApi ::
* Interface for listening to events from the Spark scheduler. Note that this is an internal
* interface which might change in different Spark releases. Java clients should extend
* {@link SparkListenerAdapter}
* {@link JavaSparkListener}
*/
@DeveloperApi
trait SparkListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ private[cluster] class ExecutorData(
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int
) extends ExecutorDetails(executorHost, totalCores)
) extends ExecutorInfo(executorHost, totalCores)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
* Stores information about an executor to pass from the scheduler to SparkListeners.
*/
@DeveloperApi
class ExecutorDetails(
class ExecutorInfo(
val executorHost: String,
val totalCores: Int
)
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.util

import java.util.{Properties, UUID}

import org.apache.spark.scheduler.cluster.ExecutorDetails
import org.apache.spark.scheduler.cluster.ExecutorInfo

import scala.collection.JavaConverters._
import scala.collection.Map
Expand Down Expand Up @@ -202,13 +202,13 @@ private[spark] object JsonProtocol {
def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
("Event" -> Utils.getFormattedClassName(executorAdded)) ~
("Executor ID" -> executorAdded.executorId) ~
("Executor Info" -> executorInfoToJson(executorAdded.executorDetails))
("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
}

def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = {
("Event" -> Utils.getFormattedClassName(executorRemoved)) ~
("Executor ID" -> executorRemoved.executorId) ~
("Executor Info" -> executorInfoToJson(executorRemoved.executorDetails))
("Executor Info" -> executorInfoToJson(executorRemoved.executorInfo))
}

/** ------------------------------------------------------------------- *
Expand Down Expand Up @@ -378,9 +378,9 @@ private[spark] object JsonProtocol {
("Disk Size" -> blockStatus.diskSize)
}

def executorInfoToJson(executorDetails: ExecutorDetails): JValue = {
("Host" -> executorDetails.executorHost) ~
("Total Cores" -> executorDetails.totalCores)
def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
("Host" -> executorInfo.executorHost) ~
("Total Cores" -> executorInfo.totalCores)
}

/** ------------------------------ *
Expand Down Expand Up @@ -780,10 +780,10 @@ private[spark] object JsonProtocol {
BlockStatus(storageLevel, memorySize, diskSize, tachyonSize)
}

def executorInfoFromJson(json: JValue): ExecutorDetails = {
def executorInfoFromJson(json: JValue): ExecutorInfo = {
val executorHost = (json \ "Host").extract[String]
val totalCores = (json \ "Total Cores").extract[Int]
new ExecutorDetails(executorHost, totalCores)
new ExecutorInfo(executorHost, totalCores)
}

/** -------------------------------- *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler

import org.apache.spark.scheduler.cluster.ExecutorDetails
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.{SparkContext, LocalSparkContext}

import org.scalatest.{FunSuite, BeforeAndAfter, BeforeAndAfterAll}
Expand All @@ -38,7 +38,7 @@ class SparkListenerWithClusterSuite extends FunSuite with LocalSparkContext
}

test("SparkListener sends executor added message") {
val listener = new SaveExecutorDetails
val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

val rdd1 = sc.parallelize(1 to 100, 4)
Expand All @@ -47,16 +47,16 @@ class SparkListenerWithClusterSuite extends FunSuite with LocalSparkContext
rdd2.count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(listener.addedExecutorDetails.size == 2)
assert(listener.addedExecutorDetails("0").totalCores == 1)
assert(listener.addedExecutorDetails("1").totalCores == 1)
assert(listener.addedExecutorInfo.size == 2)
assert(listener.addedExecutorInfo("0").totalCores == 1)
assert(listener.addedExecutorInfo("1").totalCores == 1)
}

private class SaveExecutorDetails extends SparkListener {
val addedExecutorDetails = mutable.Map[String, ExecutorDetails]()
private class SaveExecutorInfo extends SparkListener {
val addedExecutorInfo = mutable.Map[String, ExecutorInfo]()

override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
addedExecutorDetails(executor.executorId) = executor.executorDetails
addedExecutorInfo(executor.executorId) = executor.executorInfo
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.util

import java.util.Properties

import org.apache.spark.scheduler.cluster.ExecutorDetails
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.shuffle.MetadataFetchFailedException

import scala.collection.Map
Expand Down Expand Up @@ -71,9 +71,9 @@ class JsonProtocolSuite extends FunSuite {
val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
val applicationEnd = SparkListenerApplicationEnd(42L)
val executorAdded = SparkListenerExecutorAdded("exec1",
new ExecutorDetails("Hostee.awesome.com", 11))
new ExecutorInfo("Hostee.awesome.com", 11))
val executorRemoved = SparkListenerExecutorRemoved("exec2",
new ExecutorDetails("Hoster.awesome.com", 42))
new ExecutorInfo("Hoster.awesome.com", 42))

testEvent(stageSubmitted, stageSubmittedJsonString)
testEvent(stageCompleted, stageCompletedJsonString)
Expand Down Expand Up @@ -101,7 +101,7 @@ class JsonProtocolSuite extends FunSuite {
testTaskMetrics(makeTaskMetrics(
33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
testExecutorInfo(new ExecutorDetails("host", 43))
testExecutorInfo(new ExecutorInfo("host", 43))

// StorageLevel
testStorageLevel(StorageLevel.NONE)
Expand Down Expand Up @@ -311,7 +311,7 @@ class JsonProtocolSuite extends FunSuite {
assert(blockId === newBlockId)
}

private def testExecutorInfo(info: ExecutorDetails) {
private def testExecutorInfo(info: ExecutorInfo) {
val newInfo = JsonProtocol.executorInfoFromJson(JsonProtocol.executorInfoToJson(info))
assertEquals(info, newInfo)
}
Expand Down Expand Up @@ -349,10 +349,10 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(e1.environmentDetails, e2.environmentDetails)
case (e1: SparkListenerExecutorAdded, e2: SparkListenerExecutorAdded) =>
assert(e1.executorId == e1.executorId)
assertEquals(e1.executorDetails, e2.executorDetails)
assertEquals(e1.executorInfo, e2.executorInfo)
case (e1: SparkListenerExecutorRemoved, e2: SparkListenerExecutorRemoved) =>
assert(e1.executorId == e1.executorId)
assertEquals(e1.executorDetails, e2.executorDetails)
assertEquals(e1.executorInfo, e2.executorInfo)
case (e1, e2) =>
assert(e1 === e2)
case _ => fail("Events don't match in types!")
Expand Down Expand Up @@ -405,7 +405,7 @@ class JsonProtocolSuite extends FunSuite {
assert(info1.accumulables === info2.accumulables)
}

private def assertEquals(info1: ExecutorDetails, info2: ExecutorDetails) {
private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) {
assert(info1.executorHost == info2.executorHost)
assert(info1.totalCores == info2.totalCores)
}
Expand Down

0 comments on commit 1727b38

Please sign in to comment.