diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index da1c8e8aa8667..183698ffe9304 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -149,6 +149,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes In cluster mode, use spark.driver.extraJavaOptions instead. + + spark.yarn.maxAppAttempts + yarn.resourcemanager.am.max-attempts in YARN + + The maximum number of attempts that will be made to submit the application. + It should be no larger than the global number of max attempts in the YARN configuration. + + # Launching Spark on YARN diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 9c77dff48dc8b..a8970cb949985 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -102,7 +102,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, logInfo("Invoking sc stop from shutdown hook") sc.stop() } - val maxAppAttempts = client.getMaxRegAttempts(yarnConf) + val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts if (!finished) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index addaddb711d3c..a2c3f918a1ab2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -98,6 +98,11 @@ private[spark] class Client( appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(containerContext) appContext.setApplicationType("SPARK") + sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match { + case Some(v) => appContext.setMaxAppAttempts(v) + case None => logDebug("spark.yarn.maxAppAttempts is not set. " + + "Cluster's default value will be used.") + } val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) appContext.setResource(capability) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index bf4e15908bb46..e183efccbb6f7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -120,7 +120,10 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg } /** Returns the maximum number of attempts to register the AM. */ - def getMaxRegAttempts(conf: YarnConfiguration): Int = - conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) + def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = { + sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt).getOrElse( + yarnConf.getInt( + YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)) + } }