From 281589c5018b0cadbebc6f8cfac8a831dd40edeb Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 22 Jul 2014 22:55:35 -0700 Subject: [PATCH] Add a test case to BlockFetcherIteratorSuite.scala for fetching block from remote from successfully --- .../storage/BlockFetcherIteratorSuite.scala | 73 ++++++++++++++++--- 1 file changed, 64 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala index 4cc81572693a0..6adb9f51436d0 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala @@ -24,8 +24,10 @@ import org.mockito.Matchers.any import java.nio.ByteBuffer +import scala.collection.mutable.ArrayBuffer import scala.concurrent.future import scala.concurrent.ExecutionContext.Implicits.global + import org.apache.spark._ import org.apache.spark.storage.BlockFetcherIterator._ import org.apache.spark.network.{ConnectionManager, ConnectionManagerId, @@ -34,30 +36,29 @@ import org.apache.spark.network.{ConnectionManager, ConnectionManagerId, class BlockFetcherIteratorSuite extends FunSuite with Matchers { test("block fetch from remote fails using BasicBlockFetcherIterator") { - val conf = new SparkConf val blockManager = mock(classOf[BlockManager]) val connManager = mock(classOf[ConnectionManager]) - val message = Message.createBufferMessage(0) - message.hasError = true - val someMessage = Some(message) + when(blockManager.connectionManager).thenReturn(connManager) val f = future { + val message = Message.createBufferMessage(0) + message.hasError = true + val someMessage = Some(message) someMessage } - when(blockManager.connectionManager).thenReturn(connManager) when(connManager.sendMessageReliably(any(), any())).thenReturn(f) when(blockManager.futureExecContext).thenReturn(global) + when(blockManager.blockManagerId).thenReturn( BlockManagerId("test-client", "test-client", 1, 0)) when(blockManager.maxBytesInFlight).thenReturn(48 * 1024 * 1024) - val dummyBlId1 = ShuffleBlockId(0,0,0) - val dummyBlId2 = ShuffleBlockId(0,1,0) + val blId1 = ShuffleBlockId(0,0,0) + val blId2 = ShuffleBlockId(0,1,0) val bmId = BlockManagerId("test-server", "test-server",1 , 0) val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( - (bmId, Seq((dummyBlId1, 1))), - (bmId, Seq((dummyBlId2, 1))) + (bmId, Seq((blId1, 1), (blId2, 1))) ) val iterator = new BasicBlockFetcherIterator(blockManager, @@ -71,4 +72,58 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { } } + test("block fetch from remote succeed using BasicBlockFetcherIterator") { + val blockManager = mock(classOf[BlockManager]) + val connManager = mock(classOf[ConnectionManager]) + when(blockManager.connectionManager).thenReturn(connManager) + + val blId1 = ShuffleBlockId(0,0,0) + val blId2 = ShuffleBlockId(0,1,0) + val buf1 = ByteBuffer.allocate(4) + val buf2 = ByteBuffer.allocate(4) + buf1.putInt(1) + buf1.flip() + buf2.putInt(1) + buf2.flip() + val blockMessage1 = BlockMessage.fromGotBlock(GotBlock(blId1, buf1)) + val blockMessage2 = BlockMessage.fromGotBlock(GotBlock(blId2, buf2)) + val blockMessageArray = new BlockMessageArray( + Seq(blockMessage1, blockMessage2)) + + val bufferMessage = blockMessageArray.toBufferMessage + val buffer = ByteBuffer.allocate(bufferMessage.size) + val arrayBuffer = new ArrayBuffer[ByteBuffer] + bufferMessage.buffers.foreach{ b => + buffer.put(b) + } + buffer.flip + arrayBuffer += buffer + + val someMessage = Some(Message.createBufferMessage(arrayBuffer)) + + val f = future { + someMessage + } + when(connManager.sendMessageReliably(any(), + any())).thenReturn(f) + when(blockManager.futureExecContext).thenReturn(global) + + when(blockManager.blockManagerId).thenReturn( + BlockManagerId("test-client", "test-client", 1, 0)) + when(blockManager.maxBytesInFlight).thenReturn(48 * 1024 * 1024) + + val bmId = BlockManagerId("test-server", "test-server",1 , 0) + val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])]( + (bmId, Seq((blId1, 1), (blId2, 1))) + ) + + val iterator = new BasicBlockFetcherIterator(blockManager, + blocksByAddress, null) + iterator.initialize() + iterator.foreach{ + case (_, r) => { + (r.isDefined) should be(true) + } + } + } }