From 91562c64c2dd2b845b43d10e4b987a386b436f14 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 2 Jan 2015 19:34:42 +0800 Subject: [PATCH 1/4] add support for setting maxAppAttempts in the ApplicationSubmissionContext --- docs/running-on-yarn.md | 8 ++++++++ .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 5 +++++ .../scala/org/apache/spark/deploy/yarn/YarnRMClient.scala | 7 +++++-- 4 files changed, 19 insertions(+), 3 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index da1c8e8aa8667..6440c91807f9b 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -149,6 +149,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes In cluster mode, use spark.driver.extraJavaOptions instead. + + spark.yarn.amMaxAttempts + YARN default + + The maximum number of ApplicationMaster attempts. + It should not be larger than the global number set by resourcemanager. Otherwise, it will be override. + + # Launching Spark on YARN diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 9c77dff48dc8b..a8970cb949985 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -102,7 +102,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, logInfo("Invoking sc stop from shutdown hook") sc.stop() } - val maxAppAttempts = client.getMaxRegAttempts(yarnConf) + val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts if (!finished) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index addaddb711d3c..eaf60ee9607a6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -98,6 +98,11 @@ private[spark] class Client( appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(containerContext) appContext.setApplicationType("SPARK") + sparkConf.getOption("spark.yarn.amMaxAttempts").map(_.toInt) match { + case Some(v) => appContext.setMaxAppAttempts(v) + case None => logDebug("spark.yarn.amMaxAttempts is not set. " + + "Cluster's default value will be used.") + } val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) appContext.setResource(capability) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index bf4e15908bb46..1a932b5084334 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -120,7 +120,10 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg } /** Returns the maximum number of attempts to register the AM. */ - def getMaxRegAttempts(conf: YarnConfiguration): Int = - conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) + def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = { + sparkConf.getOption("spark.yarn.amMaxAttempts").map(_.toInt).getOrElse( + yarnConf.getInt( + YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)) + } } From afdfc99e2722ac3a910de91dbf0c80972e7f7eb9 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 2 Jan 2015 19:39:16 +0800 Subject: [PATCH 2/4] more detailed description --- docs/running-on-yarn.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 6440c91807f9b..54d224ae9f4d0 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -151,7 +151,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes spark.yarn.amMaxAttempts - YARN default + yarn.resourcemanager.am.max-attempts in YARN The maximum number of ApplicationMaster attempts. It should not be larger than the global number set by resourcemanager. Otherwise, it will be override. From 202ac85103d6ca097e72f93dc5e158bea50def9f Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 7 Jan 2015 09:23:51 +0800 Subject: [PATCH 3/4] rephrase some --- docs/running-on-yarn.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 54d224ae9f4d0..97e2cf4fb42dc 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -153,8 +153,8 @@ Most of the configs are the same for Spark on YARN as for other deployment modes spark.yarn.amMaxAttempts yarn.resourcemanager.am.max-attempts in YARN - The maximum number of ApplicationMaster attempts. - It should not be larger than the global number set by resourcemanager. Otherwise, it will be override. + The maximum number of attempts that will be made to submit the application. + It should be no larger than the global number of max attempts in the YARN configuration. From 1416c83be860699dbeaa10eef4585b8ff7756e0b Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 7 Jan 2015 09:31:16 +0800 Subject: [PATCH 4/4] use the name spark.yarn.maxAppAttempts --- docs/running-on-yarn.md | 2 +- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 ++-- .../scala/org/apache/spark/deploy/yarn/YarnRMClient.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 97e2cf4fb42dc..183698ffe9304 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -150,7 +150,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes - spark.yarn.amMaxAttempts + spark.yarn.maxAppAttempts yarn.resourcemanager.am.max-attempts in YARN The maximum number of attempts that will be made to submit the application. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index eaf60ee9607a6..a2c3f918a1ab2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -98,9 +98,9 @@ private[spark] class Client( appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(containerContext) appContext.setApplicationType("SPARK") - sparkConf.getOption("spark.yarn.amMaxAttempts").map(_.toInt) match { + sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match { case Some(v) => appContext.setMaxAppAttempts(v) - case None => logDebug("spark.yarn.amMaxAttempts is not set. " + + case None => logDebug("spark.yarn.maxAppAttempts is not set. " + "Cluster's default value will be used.") } val capability = Records.newRecord(classOf[Resource]) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 1a932b5084334..e183efccbb6f7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -121,7 +121,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg /** Returns the maximum number of attempts to register the AM. */ def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = { - sparkConf.getOption("spark.yarn.amMaxAttempts").map(_.toInt).getOrElse( + sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt).getOrElse( yarnConf.getInt( YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)) }