From c86a57f4d1a39ab9602733a09d8fec13506cc6d4 Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 2 Feb 2017 23:18:16 -0800 Subject: [PATCH] [SPARK-19437] Rectify spark executor id in HeartbeatReceiverSuite. ## What changes were proposed in this pull request? The current code in `HeartbeatReceiverSuite`, executorId is set as below: ``` private val executorId1 = "executor-1" private val executorId2 = "executor-2" ``` The executorId is sent to driver when register as below: ``` test("expire dead hosts should kill executors with replacement (SPARK-8119)") { ... fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty)) ... } ``` Receiving `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, the executorId will be compared with `currentExecutorIdCounter` as below: ``` case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) } else { ... executorDataMap.put(executorId, data) if (currentExecutorIdCounter < executorId.toInt) { currentExecutorIdCounter = executorId.toInt } ... ``` `executorId.toInt` will cause NumberformatException. This unit test can pass currently because of `askWithRetry`, when catching exception, RPC will call again, thus it will go `if` branch and return true. **To fix** Rectify executorId and replace `askWithRetry` with `askSync`, refer to https://github.com/apache/spark/pull/16690 ## How was this patch tested? This fix is for unit test and no need to add another one.(If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: jinxing Closes #16779 from jinxing64/SPARK-19437. --- .../apache/spark/HeartbeatReceiverSuite.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 7b6a2313f9e2a..88916488c0def 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -46,8 +46,8 @@ class HeartbeatReceiverSuite with PrivateMethodTester with LocalSparkContext { - private val executorId1 = "executor-1" - private val executorId2 = "executor-2" + private val executorId1 = "1" + private val executorId2 = "2" // Shared state that must be reset before and after each test private var scheduler: TaskSchedulerImpl = null @@ -93,12 +93,12 @@ class HeartbeatReceiverSuite test("task scheduler is set correctly") { assert(heartbeatReceiver.scheduler === null) - heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) + heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) assert(heartbeatReceiver.scheduler !== null) } test("normal heartbeat") { - heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) + heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) addExecutorAndVerify(executorId1) addExecutorAndVerify(executorId2) triggerHeartbeat(executorId1, executorShouldReregister = false) @@ -116,14 +116,14 @@ class HeartbeatReceiverSuite } test("reregister if heartbeat from unregistered executor") { - heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) + heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) // Received heartbeat from unknown executor, so we ask it to re-register triggerHeartbeat(executorId1, executorShouldReregister = true) assert(getTrackedExecutors.isEmpty) } test("reregister if heartbeat from removed executor") { - heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) + heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) addExecutorAndVerify(executorId1) addExecutorAndVerify(executorId2) // Remove the second executor but not the first @@ -140,7 +140,7 @@ class HeartbeatReceiverSuite test("expire dead hosts") { val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs()) - heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) + heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) addExecutorAndVerify(executorId1) addExecutorAndVerify(executorId2) triggerHeartbeat(executorId1, executorShouldReregister = false) @@ -149,7 +149,7 @@ class HeartbeatReceiverSuite heartbeatReceiverClock.advance(executorTimeout / 2) triggerHeartbeat(executorId1, executorShouldReregister = false) heartbeatReceiverClock.advance(executorTimeout) - heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts) + heartbeatReceiverRef.askSync[Boolean](ExpireDeadHosts) // Only the second executor should be expired as a dead host verify(scheduler).executorLost(Matchers.eq(executorId2), any()) val trackedExecutors = getTrackedExecutors @@ -173,11 +173,11 @@ class HeartbeatReceiverSuite val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv) val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1) val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2) - fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean]( + fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty)) - fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean]( + fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty)) - heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) + heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) addExecutorAndVerify(executorId1) addExecutorAndVerify(executorId2) triggerHeartbeat(executorId1, executorShouldReregister = false) @@ -195,7 +195,7 @@ class HeartbeatReceiverSuite // Here we use a timeout of O(seconds), but in practice this whole test takes O(10ms). val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs()) heartbeatReceiverClock.advance(executorTimeout * 2) - heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts) + heartbeatReceiverRef.askSync[Boolean](ExpireDeadHosts) val killThread = heartbeatReceiver.invokePrivate(_killExecutorThread()) killThread.shutdown() // needed for awaitTermination killThread.awaitTermination(10L, TimeUnit.SECONDS) @@ -213,7 +213,7 @@ class HeartbeatReceiverSuite executorShouldReregister: Boolean): Unit = { val metrics = TaskMetrics.empty val blockManagerId = BlockManagerId(executorId, "localhost", 12345) - val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse]( + val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( Heartbeat(executorId, Array(1L -> metrics.accumulators()), blockManagerId)) if (executorShouldReregister) { assert(response.reregisterBlockManager)