diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1dd0715918042..7caaa91e1af97 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -986,13 +986,15 @@ private[spark] class Client( * @param appId ID of the application to monitor. * @param returnOnRunning Whether to also return the application state when it is RUNNING. * @param logApplicationReport Whether to log details of the application report every iteration. + * @param interval How often to poll the YARN RM for application status (in ms). * @return A pair of the yarn application state and the final application state. */ def monitorApplication( appId: ApplicationId, returnOnRunning: Boolean = false, - logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { - val interval = sparkConf.get(REPORT_INTERVAL) + logApplicationReport: Boolean = true, + interval: Long = sparkConf.get(REPORT_INTERVAL)): + (YarnApplicationState, FinalApplicationStatus) = { var lastState: YarnApplicationState = null while (true) { Thread.sleep(interval) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index d4108caab28c1..187803cc6050b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -127,7 +127,7 @@ package object config { .stringConf .createOptional - /* Cluster-mode launcher configuration. */ + /* Launcher configuration. */ private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion") .doc("In cluster mode, whether to wait for the application to finish before exiting the " + @@ -136,10 +136,16 @@ package object config { .createWithDefault(true) private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.yarn.report.interval") - .doc("Interval between reports of the current app status in cluster mode.") + .doc("Interval between reports of the current app status.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1s") + private[spark] val CLIENT_LAUNCH_MONITOR_INTERVAL = + ConfigBuilder("spark.yarn.clientLaunchMonitorInterval") + .doc("Interval between requests for status the client mode AM when starting the app.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + /* Shared Client-mode AM / Driver configuration. */ private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 60da356ad14aa..d482376d14dd7 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil} +import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkAppHandle import org.apache.spark.scheduler.TaskSchedulerImpl @@ -77,8 +78,11 @@ private[spark] class YarnClientSchedulerBackend( * This assumes both `client` and `appId` have already been set. */ private def waitForApplication(): Unit = { + val monitorInterval = conf.get(CLIENT_LAUNCH_MONITOR_INTERVAL) + assert(client != null && appId.isDefined, "Application has not been submitted yet!") - val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true) // blocking + val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true, + interval = monitorInterval) // blocking if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {