From 6120295268509d517357750df1c0b8588b1ee03d Mon Sep 17 00:00:00 2001 From: ehnalis Date: Thu, 14 May 2015 20:49:30 +0200 Subject: [PATCH] Removed the bug, when allocation heartbeat would not start from initial value. --- .../spark/deploy/yarn/ApplicationMaster.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 9934e3f9e115b..c7356966f7eb7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -307,7 +307,7 @@ private[spark] class ApplicationMaster( val initialAllocationInterval = math.min(heartbeatInterval, sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms")) - var currentAllocationInterval = initialAllocationInterval + var nextAllocationInterval = initialAllocationInterval // The number of failures in a row until Reporter thread give up val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5) @@ -333,9 +333,9 @@ private[spark] class ApplicationMaster( if (!NonFatal(e) || failureCount >= reporterMaxFailures) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + - s"${failureCount} time(s) from Reporter thread.") + s"$failureCount time(s) from Reporter thread.") } else { - logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e) + logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e) } } } @@ -343,15 +343,16 @@ private[spark] class ApplicationMaster( val numPendingAllocate = allocator.getNumPendingAllocate val sleepInterval = if (numPendingAllocate > 0) { - currentAllocationInterval = - math.min(heartbeatInterval, currentAllocationInterval * 2) + val currentAllocationInterval = + math.min(heartbeatInterval, nextAllocationInterval) + nextAllocationInterval *= 2 currentAllocationInterval } else { - currentAllocationInterval = initialAllocationInterval + nextAllocationInterval = initialAllocationInterval heartbeatInterval } - logDebug(s"Number of pending allocations is ${numPendingAllocate}. " + - s"Sleeping for ${sleepInterval}.") + logDebug(s"Number of pending allocations is $numPendingAllocate. " + + s"Sleeping for $sleepInterval.") Thread.sleep(sleepInterval) } catch { case e: InterruptedException => @@ -363,8 +364,8 @@ private[spark] class ApplicationMaster( t.setDaemon(true) t.setName("Reporter") t.start() - logInfo(s"Started progress reporter thread with (heartbeat : ${heartbeatInterval}, " + - s"initial allocation : ${initialAllocationInterval}) intervals") + logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " + + s"initial allocation : $initialAllocationInterval) intervals") t }