Skip to content

Commit

Permalink
[SPARK-20529][CORE] Allow worker and master work with a proxy server
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In the current codes, when worker connects to master, master will send its address to the worker. Then worker will save this address and use it to reconnect in case of failure. However, sometimes, this address is not correct. If there is a proxy between master and worker, the address master sent is not the address of proxy.

In this PR, the master address used by the worker will be sent to the master, then master just replies this address back, worker will use this address to reconnect in case of failure. In other words, the worker will use the config master address set in the worker side if possible rather than the master address set in the master side.

There is still one potential issue though. When a master is restarted or takes over leadership, the work will use the address sent from the master to connect. If there is still a proxy between  master and worker, the address may be wrong. However, there is no way to figure it out just in the worker.

## How was this patch tested?

The new added unit test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#17821 from zsxwing/SPARK-20529.
  • Loading branch information
zsxwing authored and Robert Kruszewski committed May 19, 2017
1 parent 59a0f39 commit 32c62a2
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 17 deletions.
27 changes: 23 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
import org.apache.spark.util.Utils

private[deploy] sealed trait DeployMessage extends Serializable
Expand All @@ -34,14 +34,25 @@ private[deploy] object DeployMessages {

// Worker to Master

/**
* @param id the worker id
* @param host the worker host
* @param port the worker post
* @param worker the worker endpoint ref
* @param cores the core number of worker
* @param memory the memory size of worker
* @param workerWebUiUrl the worker Web UI address
* @param masterAddress the master address used by the worker to connect
*/
case class RegisterWorker(
id: String,
host: String,
port: Int,
worker: RpcEndpointRef,
cores: Int,
memory: Int,
workerWebUiUrl: String)
workerWebUiUrl: String,
masterAddress: RpcAddress)
extends DeployMessage {
Utils.checkHost(host)
assert (port > 0)
Expand Down Expand Up @@ -80,8 +91,16 @@ private[deploy] object DeployMessages {

sealed trait RegisterWorkerResponse

case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
with RegisterWorkerResponse
/**
* @param master the master ref
* @param masterWebUiUrl the master Web UI address
* @param masterAddress the master address used by the worker to connect. It should be
* [[RegisterWorker.masterAddress]].
*/
case class RegisteredWorker(
master: RpcEndpointRef,
masterWebUiUrl: String,
masterAddress: RpcAddress) extends DeployMessage with RegisterWorkerResponse

case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ private[deploy] class Master(
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)

case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
Expand All @@ -243,7 +244,7 @@ private[deploy] class Master(
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
workerRef.send(RegisteredWorker(self, masterWebUiUrl))
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
schedule()
} else {
val workerAddress = worker.endpoint.address
Expand Down
53 changes: 45 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ private[deploy] class Worker(

private val testing: Boolean = sys.props.contains("spark.testing")
private var master: Option[RpcEndpointRef] = None

/**
* Whether to use the master address in `masterRpcAddresses` if possible. If it's disabled, Worker
* will just use the address received from Master.
*/
private val preferConfiguredMasterAddress =
conf.getBoolean("spark.worker.preferConfiguredMasterAddress", false)
/**
* The master address to connect in case of failure. When the connection is broken, worker will
* use this address to connect. This is usually just one of `masterRpcAddresses`. However, when
* a master is restarted or takes over leadership, it will be an address sent from master, which
* may not be in `masterRpcAddresses`.
*/
private var masterAddressToConnect: Option[RpcAddress] = None
private var activeMasterUrl: String = ""
private[worker] var activeMasterWebUiUrl : String = ""
private var workerWebUiUrl: String = ""
Expand Down Expand Up @@ -196,10 +210,19 @@ private[deploy] class Worker(
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}

private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
/**
* Change to use the new master.
*
* @param masterRef the new master ref
* @param uiUrl the new master Web UI address
* @param masterAddress the new master address which the worker should use to connect in case of
* failure
*/
private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String, masterAddress: RpcAddress) {
// activeMasterUrl it's a valid Spark url since we receive it from master.
activeMasterUrl = masterRef.address.toSparkURL
activeMasterWebUiUrl = uiUrl
masterAddressToConnect = Some(masterAddress)
master = Some(masterRef)
connected = true
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
Expand Down Expand Up @@ -266,7 +289,8 @@ private[deploy] class Worker(
if (registerMasterFutures != null) {
registerMasterFutures.foreach(_.cancel(true))
}
val masterAddress = masterRef.address
val masterAddress =
if (preferConfiguredMasterAddress) masterAddressToConnect.get else masterRef.address
registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
Expand Down Expand Up @@ -342,15 +366,27 @@ private[deploy] class Worker(
}

private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.send(RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl))
masterEndpoint.send(RegisterWorker(
workerId,
host,
port,
self,
cores,
memory,
workerWebUiUrl,
masterEndpoint.address))
}

private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
case RegisteredWorker(masterRef, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
if (preferConfiguredMasterAddress) {
logInfo("Successfully registered with master " + masterAddress.toSparkURL)
} else {
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
}
registered = true
changeMaster(masterRef, masterWebUiUrl)
changeMaster(masterRef, masterWebUiUrl, masterAddress)
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
Expand Down Expand Up @@ -419,7 +455,7 @@ private[deploy] class Worker(

case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
changeMaster(masterRef, masterWebUiUrl)
changeMaster(masterRef, masterWebUiUrl, masterRef.address)

val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
Expand Down Expand Up @@ -561,7 +597,8 @@ private[deploy] class Worker(
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (master.exists(_.address == remoteAddress)) {
if (master.exists(_.address == remoteAddress) ||
masterAddressToConnect.exists(_ == remoteAddress)) {
logInfo(s"$remoteAddress Disassociated !")
masterDisconnected()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy._
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.rpc.{RpcEndpoint, RpcEnv}
import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEnv}

class MasterSuite extends SparkFunSuite
with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
Expand Down Expand Up @@ -447,8 +447,15 @@ class MasterSuite extends SparkFunSuite
}
})

master.self.send(
RegisterWorker("1", "localhost", 9999, fakeWorker, 10, 1024, "http://localhost:8080"))
master.self.send(RegisterWorker(
"1",
"localhost",
9999,
fakeWorker,
10,
1024,
"http://localhost:8080",
RpcAddress("localhost", 9999)))
val executors = (0 until 3).map { i =>
new ExecutorDescription(appId = i.toString, execId = i, 2, ExecutorState.RUNNING)
}
Expand All @@ -459,4 +466,37 @@ class MasterSuite extends SparkFunSuite
assert(killedDrivers.asScala.toList.sorted === List("0", "1", "2"))
}
}

test("SPARK-20529: Master should reply the address received from worker") {
val master = makeMaster()
master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
eventually(timeout(10.seconds)) {
val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
}

@volatile var receivedMasterAddress: RpcAddress = null
val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint {
override val rpcEnv: RpcEnv = master.rpcEnv

override def receive: PartialFunction[Any, Unit] = {
case RegisteredWorker(_, _, masterAddress) =>
receivedMasterAddress = masterAddress
}
})

master.self.send(RegisterWorker(
"1",
"localhost",
9999,
fakeWorker,
10,
1024,
"http://localhost:8080",
RpcAddress("localhost2", 10000)))

eventually(timeout(10.seconds)) {
assert(receivedMasterAddress === RpcAddress("localhost2", 10000))
}
}
}

0 comments on commit 32c62a2

Please sign in to comment.