From a2f745ca7a3d80b190197786bee4cec44b7d5f9d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 5 Aug 2014 13:52:55 -0700 Subject: [PATCH] Remove sendMessageReliablySync; callers can wait themselves. This makes the waiting more explicit. --- .../org/apache/spark/network/ConnectionManager.scala | 8 +------- .../scala/org/apache/spark/network/SenderTest.scala | 7 ++++++- .../apache/spark/storage/BlockManagerWorker.scala | 12 +++++++----- .../spark/network/ConnectionManagerSuite.scala | 6 +++--- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 9440bcbe009ba..e47e316599301 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -35,7 +35,6 @@ import scala.collection.mutable.SynchronizedQueue import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.concurrent.duration._ import scala.language.postfixOps -import scala.util.Try import org.apache.spark._ import org.apache.spark.util.{SystemClock, Utils} @@ -849,11 +848,6 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, promise.future } - def sendMessageReliablySync(connectionManagerId: ConnectionManagerId, - message: Message): Try[Message] = { - Try(Await.result(sendMessageReliably(connectionManagerId, message), Duration.Inf)) - } - def onReceiveMessage(callback: (Message, ConnectionManagerId) => Option[Message]) { onReceiveCallback = callback } @@ -911,7 +905,7 @@ private[spark] object ConnectionManager { (0 until count).map(i => { val bufferMessage = Message.createBufferMessage(buffer.duplicate) - manager.sendMessageReliablySync(manager.id, bufferMessage) + Await.result(manager.sendMessageReliably(manager.id, bufferMessage), Duration.Inf) }) println("--------------------------") println() diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala index b8ea7c2cff9a2..ea2ad104ecae1 100644 --- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala +++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala @@ -20,6 +20,10 @@ package org.apache.spark.network import java.nio.ByteBuffer import org.apache.spark.{SecurityManager, SparkConf} +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.util.Try + private[spark] object SenderTest { def main(args: Array[String]) { @@ -51,7 +55,8 @@ private[spark] object SenderTest { val dataMessage = Message.createBufferMessage(buffer.duplicate) val startTime = System.currentTimeMillis /* println("Started timer at " + startTime) */ - val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) + val promise = manager.sendMessageReliably(targetConnectionManagerId, dataMessage) + val responseStr: String = Try(Await.result(promise, Duration.Inf)) .map { response => val buffer = response.asInstanceOf[BufferMessage].buffers(0) new String(buffer.array, "utf-8") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala index 35f99d7569fe8..bf002a42d5dc5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala @@ -23,7 +23,9 @@ import org.apache.spark.Logging import org.apache.spark.network._ import org.apache.spark.util.Utils -import scala.util.{Failure, Success} +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.util.{Try, Failure, Success} /** * A network interface for BlockManager. Each slave should have one @@ -117,8 +119,8 @@ private[spark] object BlockManagerWorker extends Logging { val connectionManager = blockManager.connectionManager val blockMessage = BlockMessage.fromPutBlock(msg) val blockMessageArray = new BlockMessageArray(blockMessage) - val resultMessage = connectionManager.sendMessageReliablySync( - toConnManagerId, blockMessageArray.toBufferMessage) + val resultMessage = Try(Await.result(connectionManager.sendMessageReliably( + toConnManagerId, blockMessageArray.toBufferMessage), Duration.Inf)) resultMessage.isSuccess } @@ -127,8 +129,8 @@ private[spark] object BlockManagerWorker extends Logging { val connectionManager = blockManager.connectionManager val blockMessage = BlockMessage.fromGetBlock(msg) val blockMessageArray = new BlockMessageArray(blockMessage) - val responseMessage = connectionManager.sendMessageReliablySync( - toConnManagerId, blockMessageArray.toBufferMessage) + val responseMessage = Try(Await.result(connectionManager.sendMessageReliably( + toConnManagerId, blockMessageArray.toBufferMessage), Duration.Inf)) responseMessage match { case Success(message) => { val bufferMessage = message.asInstanceOf[BufferMessage] diff --git a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala index 03f36fe50d9c4..32d7fadb95b00 100644 --- a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala @@ -47,7 +47,7 @@ class ConnectionManagerSuite extends FunSuite { buffer.flip val bufferMessage = Message.createBufferMessage(buffer.duplicate) - manager.sendMessageReliablySync(manager.id, bufferMessage) + Await.ready(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds) assert(receivedMessage == true) @@ -80,7 +80,7 @@ class ConnectionManagerSuite extends FunSuite { (0 until count).map(i => { val bufferMessage = Message.createBufferMessage(buffer.duplicate) - manager.sendMessageReliablySync(managerServer.id, bufferMessage) + Await.ready(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds) }) assert(numReceivedServerMessages == 10) @@ -119,7 +119,7 @@ class ConnectionManagerSuite extends FunSuite { val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte)) buffer.flip val bufferMessage = Message.createBufferMessage(buffer.duplicate) - manager.sendMessageReliablySync(managerServer.id, bufferMessage) + Await.result(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds) assert(numReceivedServerMessages == 0) assert(numReceivedMessages == 0)