From f644b55f518551e01206bbe05ae4492327b68902 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 6 Aug 2014 13:45:00 -0700 Subject: [PATCH 1/5] Use more specific service name --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 9d4edeb6d96cf..c52dd42f3d10e 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -146,9 +146,9 @@ object SparkEnv extends Logging { } val securityManager = new SecurityManager(conf) - - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf, - securityManager = securityManager) + val actorSystemName = if (isDriver) "driver actor" else "executor actor" + val (actorSystem, boundPort) = AkkaUtils.createActorSystem( + actorSystemName, hostname, port, conf, securityManager) // Figure out which port Akka actually bound to in case the original port is 0 or occupied. // This is so that we tell the executors the correct port to connect to. From 1c1b42e08f2bc3cb0da68724522bc4a906894032 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 6 Aug 2014 14:23:27 -0700 Subject: [PATCH 2/5] Avoid spaces in akka system name --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index c52dd42f3d10e..ef78df63ef6c6 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -146,7 +146,7 @@ object SparkEnv extends Logging { } val securityManager = new SecurityManager(conf) - val actorSystemName = if (isDriver) "driver actor" else "executor actor" + val actorSystemName = if (isDriver) "driver-actor" else "executor-actor" val (actorSystem, boundPort) = AkkaUtils.createActorSystem( actorSystemName, hostname, port, conf, securityManager) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index d6afb73b74242..01b326b2093be 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging { val startService: Int => (ActorSystem, Int) = { actualPort => doCreateActorSystem(name, host, actualPort, conf, securityManager) } - Utils.startServiceOnPort(port, startService, name) + Utils.startServiceOnPort(port, startService, name.replaceAll("-", " ")) } private def doCreateActorSystem( From c8c6a6271ad4a9dd3edb005a6b900b3295ee729b Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 7 Aug 2014 19:18:43 -0700 Subject: [PATCH 3/5] Do not include hyphens in actor name Hopefully this allows the tests to run succesfully... --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index ef78df63ef6c6..68d7b65433603 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -146,7 +146,7 @@ object SparkEnv extends Logging { } val securityManager = new SecurityManager(conf) - val actorSystemName = if (isDriver) "driver-actor" else "executor-actor" + val actorSystemName = if (isDriver) "driverActor" else "executorActor" val (actorSystem, boundPort) = AkkaUtils.createActorSystem( actorSystemName, hostname, port, conf, securityManager) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 01b326b2093be..d6afb73b74242 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging { val startService: Int => (ActorSystem, Int) = { actualPort => doCreateActorSystem(name, host, actualPort, conf, securityManager) } - Utils.startServiceOnPort(port, startService, name.replaceAll("-", " ")) + Utils.startServiceOnPort(port, startService, name) } private def doCreateActorSystem( From 3a92843efc206b688cb0ca1f2a9c8be67c922f65 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 13 Aug 2014 19:15:13 -0700 Subject: [PATCH 4/5] Change actor name to sparkDriver and sparkExecutor Apparently we also hard-code the actor names everywhere, so the scope of this change is a little larger than expected. --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 2 +- .../apache/spark/scheduler/cluster/SimrSchedulerBackend.scala | 2 +- .../spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 2 +- .../scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala | 2 +- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 2 +- .../spark/streaming/receiver/ReceiverSupervisorImpl.scala | 2 +- .../scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 2 +- .../scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 2 +- 10 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 4a00d981873dc..b116a353a04f6 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -146,7 +146,7 @@ object SparkEnv extends Logging { } val securityManager = new SecurityManager(conf) - val actorSystemName = if (isDriver) "driverActor" else "executorActor" + val actorSystemName = if (isDriver) "sparkDriver" else "sparkExecutor" val (actorSystem, boundPort) = AkkaUtils.createActorSystem( actorSystemName, hostname, port, conf, securityManager) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index d99c76117c168..cae33b0c368c6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -38,7 +38,7 @@ private[spark] class SimrSchedulerBackend( override def start() { super.start() - val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 589dba2e40d20..3eed24b8bc8ae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -42,7 +42,7 @@ private[spark] class SparkDeploySchedulerBackend( super.start() // The endpoint for executors to talk to us - val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 9f45400bcf852..ad62629e517ba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -130,7 +130,7 @@ private[spark] class CoarseMesosSchedulerBackend( } val command = CommandInfo.newBuilder() .setEnvironment(environment) - val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index d6afb73b74242..d12bfee465761 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -195,7 +195,7 @@ private[spark] object AkkaUtils extends Logging { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") - val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name" + val url = s"akka.tcp://sparkDriver@$driverHost:$driverPort/user/$name" val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index d934b9cbfc3e8..6db0d0cdd0f0b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -56,7 +56,7 @@ private[streaming] class ReceiverSupervisorImpl( private val trackerActor = { val ip = env.conf.get("spark.driver.host", "localhost") val port = env.conf.getInt("spark.driver.port", 7077) - val url = "akka.tcp://spark@%s:%s/user/ReceiverTracker".format(ip, port) + val url = "akka.tcp://sparkDriver@%s:%s/user/ReceiverTracker".format(ip, port) env.actorSystem.actorSelection(url) } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 184e2ad6c82cd..3580b8b0f6ebe 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -229,7 +229,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp sparkConf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.port", driverPort.toString) - val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 80e0162e9f277..eff882ca31175 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -245,7 +245,7 @@ private[yarn] class YarnAllocationHandler( // Deallocate + allocate can result in reusing id's wrongly - so use a different counter // (executorIdCounter) val executorId = executorIdCounter.incrementAndGet().toString - val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index fc7b8320d734d..531efd6096d99 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -193,7 +193,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp sparkConf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.port", driverPort.toString) - val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 29ccec2adcac3..aa0498087ed64 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -262,7 +262,7 @@ private[yarn] class YarnAllocationHandler( numExecutorsRunning.decrementAndGet() } else { val executorId = executorIdCounter.incrementAndGet().toString - val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) From 8c459ed2b0d3fcddf0ebf8a9a78db437e405c8d8 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 13 Aug 2014 19:25:44 -0700 Subject: [PATCH 5/5] Use a common variable for driver/executor actor system names --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 5 ++++- .../spark/scheduler/cluster/SimrSchedulerBackend.scala | 8 +++++--- .../cluster/SparkDeploySchedulerBackend.scala | 8 +++++--- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 5 +++-- .../main/scala/org/apache/spark/util/AkkaUtils.scala | 5 +++-- .../streaming/receiver/ReceiverSupervisorImpl.scala | 10 +++++----- .../apache/spark/deploy/yarn/ExecutorLauncher.scala | 9 ++++++--- .../spark/deploy/yarn/YarnAllocationHandler.scala | 8 +++++--- .../apache/spark/deploy/yarn/ExecutorLauncher.scala | 9 ++++++--- .../spark/deploy/yarn/YarnAllocationHandler.scala | 5 +++-- 10 files changed, 45 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index b116a353a04f6..eee95c26e78b1 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -111,6 +111,9 @@ object SparkEnv extends Logging { private val env = new ThreadLocal[SparkEnv] @volatile private var lastSetSparkEnv : SparkEnv = _ + private[spark] val driverActorSystemName = "sparkDriver" + private[spark] val executorActorSystemName = "sparkExecutor" + def set(e: SparkEnv) { lastSetSparkEnv = e env.set(e) @@ -146,7 +149,7 @@ object SparkEnv extends Logging { } val securityManager = new SecurityManager(conf) - val actorSystemName = if (isDriver) "sparkDriver" else "sparkExecutor" + val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName val (actorSystem, boundPort) = AkkaUtils.createActorSystem( actorSystemName, hostname, port, conf, securityManager) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index cae33b0c368c6..4f7133c4bc17c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.{Logging, SparkContext, SparkEnv} import org.apache.spark.scheduler.TaskSchedulerImpl private[spark] class SimrSchedulerBackend( @@ -38,8 +38,10 @@ private[spark] class SimrSchedulerBackend( override def start() { super.start() - val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( - sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"), + val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + SparkEnv.driverActorSystemName, + sc.conf.get("spark.driver.host"), + sc.conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val conf = new Configuration() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 3eed24b8bc8ae..32138e5246700 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler.cluster -import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl} @@ -42,8 +42,10 @@ private[spark] class SparkDeploySchedulerBackend( super.start() // The endpoint for executors to talk to us - val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( - conf.get("spark.driver.host"), conf.get("spark.driver.port"), + val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + SparkEnv.driverActorSystemName, + conf.get("spark.driver.host"), + conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index ad62629e517ba..f0172504c55aa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -28,7 +28,7 @@ import org.apache.mesos.{Scheduler => MScheduler} import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} -import org.apache.spark.{Logging, SparkContext, SparkException} +import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -130,7 +130,8 @@ private[spark] class CoarseMesosSchedulerBackend( } val command = CommandInfo.newBuilder() .setEnvironment(environment) - val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + SparkEnv.driverActorSystemName, conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index d12bfee465761..e2d32c859bbda 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -27,7 +27,7 @@ import akka.pattern.ask import com.typesafe.config.ConfigFactory import org.apache.log4j.{Level, Logger} -import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException} /** * Various utility classes for working with Akka. @@ -192,10 +192,11 @@ private[spark] object AkkaUtils extends Logging { } def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = { + val driverActorSystemName = SparkEnv.driverActorSystemName val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") - val url = s"akka.tcp://sparkDriver@$driverHost:$driverPort/user/$name" + val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name" val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 6db0d0cdd0f0b..53a3e6200e340 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -20,22 +20,21 @@ package org.apache.spark.streaming.receiver import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicLong -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.collection.mutable.ArrayBuffer import scala.concurrent.Await import akka.actor.{Actor, Props} import akka.pattern.ask +import com.google.common.base.Throwables + import org.apache.spark.{Logging, SparkEnv} -import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.scheduler._ import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.scheduler.DeregisterReceiver import org.apache.spark.streaming.scheduler.AddBlock -import scala.Some import org.apache.spark.streaming.scheduler.RegisterReceiver -import com.google.common.base.Throwables /** * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]] @@ -56,7 +55,8 @@ private[streaming] class ReceiverSupervisorImpl( private val trackerActor = { val ip = env.conf.get("spark.driver.host", "localhost") val port = env.conf.getInt("spark.driver.port", 7077) - val url = "akka.tcp://sparkDriver@%s:%s/user/ReceiverTracker".format(ip, port) + val url = "akka.tcp://%s@%s:%s/user/ReceiverTracker".format( + SparkEnv.driverActorSystemName, ip, port) env.actorSystem.actorSelection(url) } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 3580b8b0f6ebe..2d49bf68881c1 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter @@ -229,8 +229,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp sparkConf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.port", driverPort.toString) - val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( - driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) + val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + SparkEnv.driverActorSystemName, + driverHost, + driverPort.toString, + CoarseGrainedSchedulerBackend.ACTOR_NAME) actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index eff882ca31175..568a6ef932bbd 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -26,7 +26,7 @@ import scala.collection import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.{Logging, SparkConf, SparkEnv} import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -245,8 +245,10 @@ private[yarn] class YarnAllocationHandler( // Deallocate + allocate can result in reusing id's wrongly - so use a different counter // (executorIdCounter) val executorId = executorIdCounter.incrementAndGet().toString - val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( - sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), + val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + SparkEnv.driverActorSystemName, + sparkConf.get("spark.driver.host"), + sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("launching container on " + containerId + " host " + executorHostname) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 531efd6096d99..7ef5d9766e9c4 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.conf.YarnConfiguration import akka.actor._ import akka.remote._ -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter @@ -193,8 +193,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp sparkConf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.port", driverPort.toString) - val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( - driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) + val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + SparkEnv.driverActorSystemName, + driverHost, + driverPort.toString, + CoarseGrainedSchedulerBackend.ACTOR_NAME) actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index aa0498087ed64..0a461749c819d 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -26,7 +26,7 @@ import scala.collection import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.{Logging, SparkConf, SparkEnv} import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils @@ -262,7 +262,8 @@ private[yarn] class YarnAllocationHandler( numExecutorsRunning.decrementAndGet() } else { val executorId = executorIdCounter.incrementAndGet().toString - val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + SparkEnv.driverActorSystemName, sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME)