Skip to content

Commit

Permalink
[SPARK-16019][YARN] Use separate RM poll interval when starting clien…
Browse files Browse the repository at this point in the history
…t AM.

Currently the code monitoring the launch of the client AM uses the value of
spark.yarn.report.interval as the interval for polling the RM; if someone
has that value to a really large interval, it would take that long to detect
that the client AM has started, which is not expected.

Instead, have a separate config for the interval to use when the client AM is
starting. The other config is still used in cluster mode, and to detect the
status of the client AM after it is already running.

Tested by running client and cluster mode apps with a modified value of
spark.yarn.report.interval, verifying client AM launch is detected before
that interval elapses.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #18380 from vanzin/SPARK-16019.
  • Loading branch information
Marcelo Vanzin committed Jul 11, 2017
1 parent ebc124d commit 1cad31f
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -986,13 +986,15 @@ private[spark] class Client(
* @param appId ID of the application to monitor.
* @param returnOnRunning Whether to also return the application state when it is RUNNING.
* @param logApplicationReport Whether to log details of the application report every iteration.
* @param interval How often to poll the YARN RM for application status (in ms).
* @return A pair of the yarn application state and the final application state.
*/
def monitorApplication(
appId: ApplicationId,
returnOnRunning: Boolean = false,
logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
val interval = sparkConf.get(REPORT_INTERVAL)
logApplicationReport: Boolean = true,
interval: Long = sparkConf.get(REPORT_INTERVAL)):
(YarnApplicationState, FinalApplicationStatus) = {
var lastState: YarnApplicationState = null
while (true) {
Thread.sleep(interval)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ package object config {
.stringConf
.createOptional

/* Cluster-mode launcher configuration. */
/* Launcher configuration. */

private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
.doc("In cluster mode, whether to wait for the application to finish before exiting the " +
Expand All @@ -136,10 +136,16 @@ package object config {
.createWithDefault(true)

private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.yarn.report.interval")
.doc("Interval between reports of the current app status in cluster mode.")
.doc("Interval between reports of the current app status.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

private[spark] val CLIENT_LAUNCH_MONITOR_INTERVAL =
ConfigBuilder("spark.yarn.clientLaunchMonitorInterval")
.doc("Interval between requests for status the client mode AM when starting the app.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

/* Shared Client-mode AM / Driver configuration. */

private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkAppHandle
import org.apache.spark.scheduler.TaskSchedulerImpl
Expand Down Expand Up @@ -77,8 +78,11 @@ private[spark] class YarnClientSchedulerBackend(
* This assumes both `client` and `appId` have already been set.
*/
private def waitForApplication(): Unit = {
val monitorInterval = conf.get(CLIENT_LAUNCH_MONITOR_INTERVAL)

assert(client != null && appId.isDefined, "Application has not been submitted yet!")
val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true) // blocking
val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true,
interval = monitorInterval) // blocking
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
Expand Down

0 comments on commit 1cad31f

Please sign in to comment.