Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2886] Use more specific actor system name than "spark" #1810

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ object SparkEnv extends Logging {
private val env = new ThreadLocal[SparkEnv]
@volatile private var lastSetSparkEnv : SparkEnv = _

private[spark] val driverActorSystemName = "sparkDriver"
private[spark] val executorActorSystemName = "sparkExecutor"

def set(e: SparkEnv) {
lastSetSparkEnv = e
env.set(e)
Expand Down Expand Up @@ -146,9 +149,9 @@ object SparkEnv extends Logging {
}

val securityManager = new SecurityManager(conf)

val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
securityManager = securityManager)
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
actorSystemName, hostname, port, conf, securityManager)

// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
// This is so that we tell the executors the correct port to connect to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.{Logging, SparkContext, SparkEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl

private[spark] class SimrSchedulerBackend(
Expand All @@ -38,8 +38,10 @@ private[spark] class SimrSchedulerBackend(
override def start() {
super.start()

val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"),
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
sc.conf.get("spark.driver.host"),
sc.conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)

val conf = new Configuration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
Expand All @@ -42,8 +42,10 @@ private[spark] class SparkDeploySchedulerBackend(
super.start()

// The endpoint for executors to talk to us
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
conf.get("spark.driver.host"), conf.get("spark.driver.port"),
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}

import org.apache.spark.{Logging, SparkContext, SparkException}
import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

Expand Down Expand Up @@ -130,7 +130,8 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import akka.pattern.ask
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}

/**
* Various utility classes for working with Akka.
Expand Down Expand Up @@ -192,10 +192,11 @@ private[spark] object AkkaUtils extends Logging {
}

def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = {
val driverActorSystemName = SparkEnv.driverActorSystemName
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,21 @@ package org.apache.spark.streaming.receiver
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Await

import akka.actor.{Actor, Props}
import akka.pattern.ask

import com.google.common.base.Throwables

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler._
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler.DeregisterReceiver
import org.apache.spark.streaming.scheduler.AddBlock
import scala.Some
import org.apache.spark.streaming.scheduler.RegisterReceiver
import com.google.common.base.Throwables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just some import cleanups here

/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
Expand All @@ -56,7 +55,8 @@ private[streaming] class ReceiverSupervisorImpl(
private val trackerActor = {
val ip = env.conf.get("spark.driver.host", "localhost")
val port = env.conf.getInt("spark.driver.port", 7077)
val url = "akka.tcp://spark@%s:%s/user/ReceiverTracker".format(ip, port)
val url = "akka.tcp://%s@%s:%s/user/ReceiverTracker".format(
SparkEnv.driverActorSystemName, ip, port)
env.actorSystem.actorSelection(url)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
Expand Down Expand Up @@ -229,8 +229,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)

val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
driverHost,
driverPort.toString,
CoarseGrainedSchedulerBackend.ACTOR_NAME)

actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -245,8 +245,10 @@ private[yarn] class YarnAllocationHandler(
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
// (executorIdCounter)
val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"),
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)

logInfo("launching container on " + containerId + " host " + executorHostname)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import akka.actor._
import akka.remote._
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
Expand Down Expand Up @@ -193,8 +193,11 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)

val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
driverHost,
driverPort.toString,
CoarseGrainedSchedulerBackend.ACTOR_NAME)

actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -262,7 +262,8 @@ private[yarn] class YarnAllocationHandler(
numExecutorsRunning.decrementAndGet()
} else {
val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
Expand Down