Skip to content

Commit

Permalink
Removed the bug, when allocation heartbeat would not start from initi…
Browse files Browse the repository at this point in the history
…al value.
  • Loading branch information
zzvara committed May 14, 2015
1 parent 08bac63 commit 6120295
Showing 1 changed file with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ private[spark] class ApplicationMaster(
val initialAllocationInterval = math.min(heartbeatInterval,
sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))

var currentAllocationInterval = initialAllocationInterval
var nextAllocationInterval = initialAllocationInterval

// The number of failures in a row until Reporter thread give up
val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
Expand All @@ -333,25 +333,26 @@ private[spark] class ApplicationMaster(
if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"${failureCount} time(s) from Reporter thread.")
s"$failureCount time(s) from Reporter thread.")
} else {
logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e)
logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
}
}
}
try {
val numPendingAllocate = allocator.getNumPendingAllocate
val sleepInterval =
if (numPendingAllocate > 0) {
currentAllocationInterval =
math.min(heartbeatInterval, currentAllocationInterval * 2)
val currentAllocationInterval =
math.min(heartbeatInterval, nextAllocationInterval)
nextAllocationInterval *= 2
currentAllocationInterval
} else {
currentAllocationInterval = initialAllocationInterval
nextAllocationInterval = initialAllocationInterval
heartbeatInterval
}
logDebug(s"Number of pending allocations is ${numPendingAllocate}. " +
s"Sleeping for ${sleepInterval}.")
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Sleeping for $sleepInterval.")
Thread.sleep(sleepInterval)
} catch {
case e: InterruptedException =>
Expand All @@ -363,8 +364,8 @@ private[spark] class ApplicationMaster(
t.setDaemon(true)
t.setName("Reporter")
t.start()
logInfo(s"Started progress reporter thread with (heartbeat : ${heartbeatInterval}, " +
s"initial allocation : ${initialAllocationInterval}) intervals")
logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " +
s"initial allocation : $initialAllocationInterval) intervals")
t
}

Expand Down

0 comments on commit 6120295

Please sign in to comment.