Skip to content

Commit

Permalink
[SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats.
Browse files Browse the repository at this point in the history
Added faster RM-heartbeats on pending container allocations with multiplicative back-off.
Also updated related documentations.
  • Loading branch information
zzvara committed May 12, 2015
1 parent f3e8e60 commit d4408c9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 16 deletions.
17 changes: 8 additions & 9 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
<td>5000</td>
<td>
The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager.
To avoid the application master to be expired by late reporting, if a higher value is provided, the interval will be set to the half of the expiry interval in YARN's configuration <code>(yarn.am.liveness-monitor.expiry-interval-ms / 2)</code>.
</td>
</tr>
<tr>
<td><code>spark.yarn.scheduler.allocation.interval-ms</code></td>
<td>200</td>
<td>
The interval in ms in which the Spark application master eagerly heartbeats to the YARN ResourceManager on pending container allocations. It should be no larger than <code>spark.yarn.scheduler.heartbeat.interval-ms</code>. The allocation interval will double on successive eager heartbeats if pending containers still exist, until <code>spark.yarn.scheduler.heartbeat.interval-ms</code> reached.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -220,15 +228,6 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
Otherwise, the client process will exit after submission.
</td>
</tr>
<tr>
<td><code>spark.yarn.executor.nodeLabelExpression</code></td>
<td>(none)</td>
<td>
A YARN node label expression that restricts the set of nodes executors will be scheduled on.
Only versions of YARN greater than or equal to 2.6 support node label expressions, so when
running against earlier versions, this property will be ignored.
</td>
</tr>
</table>

# Launching Spark on YARN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,14 @@ private[spark] class ApplicationMaster(
val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)

// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval =
sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "5s")
val heartbeatInterval = math.max(0, math.min(expiryInterval / 2,
sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "5s")))

// must be <= expiryInterval / 2.
val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
// we want to check more frequently for pending containers
val eagerAllocationInterval = math.min(heartbeatInterval,
sparkConf.getTimeAsMs("spark.yarn.scheduler.allocation.interval-ms", "200"))

var currentAllocationInterval = eagerAllocationInterval

// The number of failures in a row until Reporter thread give up
val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
Expand All @@ -331,14 +334,25 @@ private[spark] class ApplicationMaster(
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"${failureCount} time(s) from Reporter thread.")

} else {
logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e)
}
}
}
try {
Thread.sleep(interval)
val numPendingAllocate = allocator.getNumPendingAllocate
if (numPendingAllocate > 0) {
currentAllocationInterval =
math.min(heartbeatInterval,currentAllocationInterval * 2)
logDebug(s"Number of pending allocations is ${numPendingAllocate}. " +
"Sleeping for " + currentAllocationInterval)
Thread.sleep(currentAllocationInterval)
} else {
logDebug(s"Number of pending allocations is ${numPendingAllocate}. " +
"Sleeping for " + heartbeatInterval)
currentAllocationInterval = eagerAllocationInterval
Thread.sleep(heartbeatInterval)
}
} catch {
case e: InterruptedException =>
}
Expand All @@ -349,7 +363,8 @@ private[spark] class ApplicationMaster(
t.setDaemon(true)
t.setName("Reporter")
t.start()
logInfo("Started progress reporter thread - sleep time : " + interval)
logInfo("Started progress reporter thread with (heartbeat : " + heartbeatInterval +
", eager allocation : " + eagerAllocationInterval + ") intervals")
t
}

Expand Down

0 comments on commit d4408c9

Please sign in to comment.