Skip to content

Commit

Permalink
[SPARK-2037]: yarn client mode doesn't support spark.yarn.max.executo…
Browse files Browse the repository at this point in the history
…r.failures

Author: GuoQiang Li <witgo@qq.com>

Closes apache#1180 from witgo/SPARK-2037 and squashes the following commits:

3d52411 [GuoQiang Li] review commit
7058f4d [GuoQiang Li] Correctly stop SparkContext
6d0561f [GuoQiang Li] Fix: yarn client mode doesn't support spark.yarn.max.executor.failures
  • Loading branch information
witgo authored and conviva-zz committed Sep 4, 2014
1 parent 6f92119 commit ec3cc9c
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
Expand Down Expand Up @@ -57,10 +56,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)

private var yarnAllocator: YarnAllocationHandler = _
private var driverClosed:Boolean = false

private var driverClosed: Boolean = false
private var isFinished: Boolean = false
private var registered: Boolean = false

// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))

val securityManager = new SecurityManager(sparkConf)
val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityManager)._1
var actor: ActorRef = _

Expand Down Expand Up @@ -97,23 +103,26 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()

val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()

// Compute number of threads for akka
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()

if (minimumMemory > 0) {
val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
YarnAllocationHandler.MEMORY_OVERHEAD)
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)

if (numCore > 0) {
// do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
// TODO: Uncomment when hadoop is on a version which has this fixed.
// args.workerCores = numCore
synchronized {
if (!isFinished) {
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
// Compute number of threads for akka
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()

if (minimumMemory > 0) {
val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
YarnAllocationHandler.MEMORY_OVERHEAD)
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)

if (numCore > 0) {
// do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
// TODO: Uncomment when hadoop is on a version which has this fixed.
// args.workerCores = numCore
}
}
registered = true
}
}

waitForSparkMaster()
addAmIpFilter()
// Allocate all containers
Expand Down Expand Up @@ -243,11 +252,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
checkNumExecutorsFailed()
Thread.sleep(100)
}

logInfo("All executors have launched.")

}
private def checkNumExecutorsFailed() {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
}

// TODO: We might want to extend this to allocate more containers in case they die !
Expand All @@ -257,6 +272,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val t = new Thread {
override def run() {
while (!driverClosed) {
checkNumExecutorsFailed()
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
logInfo("Allocating " + missingExecutorCount +
Expand All @@ -282,15 +298,23 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
yarnAllocator.allocateContainers(0)
}

def finishApplicationMaster(status: FinalApplicationStatus) {

logInfo("finish ApplicationMaster with " + status)
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
finishReq.setTrackingUrl(sparkConf.get("spark.driver.appUIHistoryAddress", ""))
resourceManager.finishApplicationMaster(finishReq)
def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
synchronized {
if (isFinished) {
return
}
logInfo("Unregistering ApplicationMaster with " + status)
if (registered) {
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
finishReq.setDiagnostics(appMessage)
resourceManager.finishApplicationMaster(finishReq)
}
isFinished = true
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ private[spark] class YarnClientSchedulerBackend(

var client: Client = null
var appId: ApplicationId = null
var checkerThread: Thread = null
var stopping: Boolean = false

private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
arrayBuf: ArrayBuffer[String]) {
Expand Down Expand Up @@ -86,6 +88,7 @@ private[spark] class YarnClientSchedulerBackend(
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
checkerThread = yarnApplicationStateCheckerThread()
}

def waitForApp() {
Expand Down Expand Up @@ -116,7 +119,32 @@ private[spark] class YarnClientSchedulerBackend(
}
}

private def yarnApplicationStateCheckerThread(): Thread = {
val t = new Thread {
override def run() {
while (!stopping) {
val report = client.getApplicationReport(appId)
val state = report.getYarnApplicationState()
if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED
|| state == YarnApplicationState.FAILED) {
logError(s"Yarn application already ended: $state")
sc.stop()
stopping = true
}
Thread.sleep(1000L)
}
checkerThread = null
Thread.currentThread().interrupt()
}
}
t.setName("Yarn Application State Checker")
t.setDaemon(true)
t.start()
t
}

override def stop() {
stopping = true
super.stop()
client.stop
logInfo("Stopped")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@ package org.apache.spark.deploy.yarn

import java.net.Socket
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
Expand Down Expand Up @@ -57,10 +54,16 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)

private var yarnAllocator: YarnAllocationHandler = _
private var driverClosed:Boolean = false
private var driverClosed: Boolean = false
private var isFinished: Boolean = false
private var registered: Boolean = false

private var amClient: AMRMClient[ContainerRequest] = _

// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))

val securityManager = new SecurityManager(sparkConf)
val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityManager)._1
Expand Down Expand Up @@ -101,7 +104,12 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
amClient.start()

appAttemptId = ApplicationMaster.getApplicationAttemptId()
registerApplicationMaster()
synchronized {
if (!isFinished) {
registerApplicationMaster()
registered = true
}
}

waitForSparkMaster()
addAmIpFilter()
Expand Down Expand Up @@ -210,6 +218,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
yarnAllocator.addResourceRequests(args.numExecutors)
yarnAllocator.allocateResources()
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
Thread.sleep(100)
Expand All @@ -228,12 +237,20 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
}

private def checkNumExecutorsFailed() {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
}

private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime

val t = new Thread {
override def run() {
while (!driverClosed) {
checkNumExecutorsFailed()
allocateMissingExecutor()
logDebug("Sending progress")
yarnAllocator.allocateResources()
Expand All @@ -248,10 +265,18 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
t
}

def finishApplicationMaster(status: FinalApplicationStatus) {
logInfo("Unregistering ApplicationMaster with " + status)
val trackingUrl = sparkConf.get("spark.driver.appUIHistoryAddress", "")
amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl)
def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
synchronized {
if (isFinished) {
return
}
logInfo("Unregistering ApplicationMaster with " + status)
if (registered) {
val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
amClient.unregisterApplicationMaster(status, appMessage, trackingUrl)
}
isFinished = true
}
}

}
Expand Down

0 comments on commit ec3cc9c

Please sign in to comment.