From 1727b38a1ba00e0ab9543a2da6262cb25736b588 Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Mon, 5 Jan 2015 17:27:01 -0800 Subject: [PATCH] Renamed ExecutorDetails back to ExecutorInfo and other CR feedback --- ...stenerAdapter.java => JavaSparkListener.java} | 2 +- .../spark/deploy/master/ApplicationInfo.scala | 14 +++++++------- .../{ExecutorInfo.scala => ExecutorDesc.scala} | 4 ++-- .../org/apache/spark/deploy/master/Master.scala | 2 +- .../apache/spark/deploy/master/WorkerInfo.scala | 6 +++--- .../spark/deploy/master/ui/ApplicationPage.scala | 4 ++-- .../apache/spark/scheduler/SparkListener.scala | 8 ++++---- .../spark/scheduler/cluster/ExecutorData.scala | 2 +- ...{ExecutorDetails.scala => ExecutorInfo.scala} | 2 +- .../org/apache/spark/util/JsonProtocol.scala | 16 ++++++++-------- .../SparkListenerWithClusterSuite.scala | 16 ++++++++-------- .../apache/spark/util/JsonProtocolSuite.scala | 16 ++++++++-------- 12 files changed, 46 insertions(+), 46 deletions(-) rename core/src/main/java/org/apache/spark/{SparkListenerAdapter.java => JavaSparkListener.java} (98%) rename core/src/main/scala/org/apache/spark/deploy/master/{ExecutorInfo.scala => ExecutorDesc.scala} (95%) rename core/src/main/scala/org/apache/spark/scheduler/cluster/{ExecutorDetails.scala => ExecutorInfo.scala} (97%) diff --git a/core/src/main/java/org/apache/spark/SparkListenerAdapter.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java similarity index 98% rename from core/src/main/java/org/apache/spark/SparkListenerAdapter.java rename to core/src/main/java/org/apache/spark/JavaSparkListener.java index bbfe643975306..646496f313507 100644 --- a/core/src/main/java/org/apache/spark/SparkListenerAdapter.java +++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java @@ -45,7 +45,7 @@ * new events get added to both the SparkListener and this adapter * in lockstep. */ -public class SparkListenerAdapter implements SparkListener { +public class JavaSparkListener implements SparkListener { @Override public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index ad7d81747c377..ede0a9dbefb8d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -38,8 +38,8 @@ private[spark] class ApplicationInfo( extends Serializable { @transient var state: ApplicationState.Value = _ - @transient var executors: mutable.HashMap[Int, ExecutorInfo] = _ - @transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _ + @transient var executors: mutable.HashMap[Int, ExecutorDesc] = _ + @transient var removedExecutors: ArrayBuffer[ExecutorDesc] = _ @transient var coresGranted: Int = _ @transient var endTime: Long = _ @transient var appSource: ApplicationSource = _ @@ -55,12 +55,12 @@ private[spark] class ApplicationInfo( private def init() { state = ApplicationState.WAITING - executors = new mutable.HashMap[Int, ExecutorInfo] + executors = new mutable.HashMap[Int, ExecutorDesc] coresGranted = 0 endTime = -1L appSource = new ApplicationSource(this) nextExecutorId = 0 - removedExecutors = new ArrayBuffer[ExecutorInfo] + removedExecutors = new ArrayBuffer[ExecutorDesc] } private def newExecutorId(useID: Option[Int] = None): Int = { @@ -75,14 +75,14 @@ private[spark] class ApplicationInfo( } } - def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = { - val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave) + def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorDesc = { + val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave) executors(exec.id) = exec coresGranted += cores exec } - def removeExecutor(exec: ExecutorInfo) { + def removeExecutor(exec: ExecutorDesc) { if (executors.contains(exec.id)) { removedExecutors += executors(exec.id) executors -= exec.id diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala similarity index 95% rename from core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala rename to core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala index d417070c51016..5d620dfcabad5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.master import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} -private[spark] class ExecutorInfo( +private[spark] class ExecutorDesc( val id: Int, val application: ApplicationInfo, val worker: WorkerInfo, @@ -37,7 +37,7 @@ private[spark] class ExecutorInfo( override def equals(other: Any): Boolean = { other match { - case info: ExecutorInfo => + case info: ExecutorDesc => fullId == info.fullId && worker.id == info.worker.id && cores == info.cores && diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index e8a5cfc746fed..c3c24e8b7a983 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -581,7 +581,7 @@ private[spark] class Master( } } - def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) { + def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 473ddc23ff0f3..e94aae93e4495 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -38,7 +38,7 @@ private[spark] class WorkerInfo( Utils.checkHost(host, "Expected hostname") assert (port > 0) - @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info + @transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // executorId => info @transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info @transient var state: WorkerState.Value = _ @transient var coresUsed: Int = _ @@ -70,13 +70,13 @@ private[spark] class WorkerInfo( host + ":" + port } - def addExecutor(exec: ExecutorInfo) { + def addExecutor(exec: ExecutorDesc) { executors(exec.fullId) = exec coresUsed += exec.cores memoryUsed += exec.memory } - def removeExecutor(exec: ExecutorInfo) { + def removeExecutor(exec: ExecutorDesc) { if (executors.contains(exec.fullId)) { executors -= exec.fullId coresUsed -= exec.cores diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 4588c130ef439..3aae2b95d7396 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -27,7 +27,7 @@ import org.json4s.JValue import org.apache.spark.deploy.{ExecutorState, JsonProtocol} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} -import org.apache.spark.deploy.master.ExecutorInfo +import org.apache.spark.deploy.master.ExecutorDesc import org.apache.spark.ui.{UIUtils, WebUIPage} import org.apache.spark.util.Utils @@ -109,7 +109,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app UIUtils.basicSparkPage(content, "Application: " + app.desc.name) } - private def executorRow(executor: ExecutorInfo): Seq[Node] = { + private def executorRow(executor: ExecutorDesc): Seq[Node] = { {executor.id} diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index e668934389f7b..6bda8b00aca5b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics -import org.apache.spark.scheduler.cluster.ExecutorDetails +import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{Distribution, Utils} @@ -86,11 +86,11 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent @DeveloperApi -case class SparkListenerExecutorAdded(executorId: String, executorDetails: ExecutorDetails) +case class SparkListenerExecutorAdded(executorId: String, executorInfo: ExecutorInfo) extends SparkListenerEvent @DeveloperApi -case class SparkListenerExecutorRemoved(executorId: String, executorDetails: ExecutorDetails) +case class SparkListenerExecutorRemoved(executorId: String, executorInfo: ExecutorInfo) extends SparkListenerEvent /** @@ -119,7 +119,7 @@ private[spark] case object SparkListenerShutdown extends SparkListenerEvent * :: DeveloperApi :: * Interface for listening to events from the Spark scheduler. Note that this is an internal * interface which might change in different Spark releases. Java clients should extend - * {@link SparkListenerAdapter} + * {@link JavaSparkListener} */ @DeveloperApi trait SparkListener { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index 07897d1562659..eb52ddfb1eab1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -34,4 +34,4 @@ private[cluster] class ExecutorData( override val executorHost: String, var freeCores: Int, override val totalCores: Int -) extends ExecutorDetails(executorHost, totalCores) +) extends ExecutorInfo(executorHost, totalCores) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorDetails.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala similarity index 97% rename from core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorDetails.scala rename to core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala index 2af747e04846c..20fe645b65627 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorDetails.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala @@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi * Stores information about an executor to pass from the scheduler to SparkListeners. */ @DeveloperApi -class ExecutorDetails( +class ExecutorInfo( val executorHost: String, val totalCores: Int ) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index cce0584d35bc3..8faa10baf528a 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -19,7 +19,7 @@ package org.apache.spark.util import java.util.{Properties, UUID} -import org.apache.spark.scheduler.cluster.ExecutorDetails +import org.apache.spark.scheduler.cluster.ExecutorInfo import scala.collection.JavaConverters._ import scala.collection.Map @@ -202,13 +202,13 @@ private[spark] object JsonProtocol { def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = { ("Event" -> Utils.getFormattedClassName(executorAdded)) ~ ("Executor ID" -> executorAdded.executorId) ~ - ("Executor Info" -> executorInfoToJson(executorAdded.executorDetails)) + ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo)) } def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = { ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~ ("Executor ID" -> executorRemoved.executorId) ~ - ("Executor Info" -> executorInfoToJson(executorRemoved.executorDetails)) + ("Executor Info" -> executorInfoToJson(executorRemoved.executorInfo)) } /** ------------------------------------------------------------------- * @@ -378,9 +378,9 @@ private[spark] object JsonProtocol { ("Disk Size" -> blockStatus.diskSize) } - def executorInfoToJson(executorDetails: ExecutorDetails): JValue = { - ("Host" -> executorDetails.executorHost) ~ - ("Total Cores" -> executorDetails.totalCores) + def executorInfoToJson(executorInfo: ExecutorInfo): JValue = { + ("Host" -> executorInfo.executorHost) ~ + ("Total Cores" -> executorInfo.totalCores) } /** ------------------------------ * @@ -780,10 +780,10 @@ private[spark] object JsonProtocol { BlockStatus(storageLevel, memorySize, diskSize, tachyonSize) } - def executorInfoFromJson(json: JValue): ExecutorDetails = { + def executorInfoFromJson(json: JValue): ExecutorInfo = { val executorHost = (json \ "Host").extract[String] val totalCores = (json \ "Total Cores").extract[Int] - new ExecutorDetails(executorHost, totalCores) + new ExecutorInfo(executorHost, totalCores) } /** -------------------------------- * diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala index 93d0d37153b8c..623a687c359a2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import org.apache.spark.scheduler.cluster.ExecutorDetails +import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.{SparkContext, LocalSparkContext} import org.scalatest.{FunSuite, BeforeAndAfter, BeforeAndAfterAll} @@ -38,7 +38,7 @@ class SparkListenerWithClusterSuite extends FunSuite with LocalSparkContext } test("SparkListener sends executor added message") { - val listener = new SaveExecutorDetails + val listener = new SaveExecutorInfo sc.addSparkListener(listener) val rdd1 = sc.parallelize(1 to 100, 4) @@ -47,16 +47,16 @@ class SparkListenerWithClusterSuite extends FunSuite with LocalSparkContext rdd2.count() assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) - assert(listener.addedExecutorDetails.size == 2) - assert(listener.addedExecutorDetails("0").totalCores == 1) - assert(listener.addedExecutorDetails("1").totalCores == 1) + assert(listener.addedExecutorInfo.size == 2) + assert(listener.addedExecutorInfo("0").totalCores == 1) + assert(listener.addedExecutorInfo("1").totalCores == 1) } - private class SaveExecutorDetails extends SparkListener { - val addedExecutorDetails = mutable.Map[String, ExecutorDetails]() + private class SaveExecutorInfo extends SparkListener { + val addedExecutorInfo = mutable.Map[String, ExecutorInfo]() override def onExecutorAdded(executor: SparkListenerExecutorAdded) { - addedExecutorDetails(executor.executorId) = executor.executorDetails + addedExecutorInfo(executor.executorId) = executor.executorInfo } } } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index d24a614535679..432a9c20cb4ba 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.util import java.util.Properties -import org.apache.spark.scheduler.cluster.ExecutorDetails +import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.shuffle.MetadataFetchFailedException import scala.collection.Map @@ -71,9 +71,9 @@ class JsonProtocolSuite extends FunSuite { val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield") val applicationEnd = SparkListenerApplicationEnd(42L) val executorAdded = SparkListenerExecutorAdded("exec1", - new ExecutorDetails("Hostee.awesome.com", 11)) + new ExecutorInfo("Hostee.awesome.com", 11)) val executorRemoved = SparkListenerExecutorRemoved("exec2", - new ExecutorDetails("Hoster.awesome.com", 42)) + new ExecutorInfo("Hoster.awesome.com", 42)) testEvent(stageSubmitted, stageSubmittedJsonString) testEvent(stageCompleted, stageCompletedJsonString) @@ -101,7 +101,7 @@ class JsonProtocolSuite extends FunSuite { testTaskMetrics(makeTaskMetrics( 33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500)) - testExecutorInfo(new ExecutorDetails("host", 43)) + testExecutorInfo(new ExecutorInfo("host", 43)) // StorageLevel testStorageLevel(StorageLevel.NONE) @@ -311,7 +311,7 @@ class JsonProtocolSuite extends FunSuite { assert(blockId === newBlockId) } - private def testExecutorInfo(info: ExecutorDetails) { + private def testExecutorInfo(info: ExecutorInfo) { val newInfo = JsonProtocol.executorInfoFromJson(JsonProtocol.executorInfoToJson(info)) assertEquals(info, newInfo) } @@ -349,10 +349,10 @@ class JsonProtocolSuite extends FunSuite { assertEquals(e1.environmentDetails, e2.environmentDetails) case (e1: SparkListenerExecutorAdded, e2: SparkListenerExecutorAdded) => assert(e1.executorId == e1.executorId) - assertEquals(e1.executorDetails, e2.executorDetails) + assertEquals(e1.executorInfo, e2.executorInfo) case (e1: SparkListenerExecutorRemoved, e2: SparkListenerExecutorRemoved) => assert(e1.executorId == e1.executorId) - assertEquals(e1.executorDetails, e2.executorDetails) + assertEquals(e1.executorInfo, e2.executorInfo) case (e1, e2) => assert(e1 === e2) case _ => fail("Events don't match in types!") @@ -405,7 +405,7 @@ class JsonProtocolSuite extends FunSuite { assert(info1.accumulables === info2.accumulables) } - private def assertEquals(info1: ExecutorDetails, info2: ExecutorDetails) { + private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) { assert(info1.executorHost == info2.executorHost) assert(info1.totalCores == info2.totalCores) }