diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 3f2e9b2b19e72..0571454c01dae 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -77,12 +77,12 @@ public void testKafkaStream() throws InterruptedException { testSuite.createTopic(topic); HashMap tmp = new HashMap(sent); testSuite.produceAndSendMessage(topic, - JavaConverters.asScalaMapConverter(tmp).asScala().toMap( + JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( Predef.>conforms())); HashMap kafkaParams = new HashMap(); kafkaParams.put("zookeeper.connect", testSuite.zkConnect()); - kafkaParams.put("group.id", "test-consumer-" + testSuite.random().nextInt(10000)); + kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000)); kafkaParams.put("auto.offset.reset", "smallest"); JavaPairDStream stream = KafkaUtils.createStream(ssc, diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index a697230d62582..c0b55e9340253 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -37,16 +37,17 @@ import org.apache.zookeeper.server.NIOServerCnxnFactory import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils class KafkaStreamSuite extends TestSuiteBase { - import KafkaStreamSuite._ + import KafkaTestUtils._ val zkConnect = "localhost:2181" val zkConnectionTimeout = 6000 val zkSessionTimeout = 6000 val brokerPort = 9092 - val brokerProps = getBrokerConfig(brokerPort) + val brokerProps = getBrokerConfig(brokerPort, zkConnect) val brokerConf = new KafkaConfig(brokerProps) protected var zookeeper: EmbeddedZookeeper = _ @@ -76,7 +77,7 @@ class KafkaStreamSuite extends TestSuiteBase { override def afterFunction() { producer.close() server.shutdown() - brokerConf.logDirs.foreach { f => deleteDir(new File(f)) } + brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } zkClient.close() zookeeper.shutdown() @@ -119,25 +120,6 @@ class KafkaStreamSuite extends TestSuiteBase { ssc.stop() } - private def getBrokerConfig(port: Int): Properties = { - val props = new Properties() - props.put("broker.id", "0") - props.put("host.name", "localhost") - props.put("port", port.toString) - props.put("log.dir", createTmpDir().getAbsolutePath) - props.put("zookeeper.connect", zkConnect) - props.put("log.flush.interval.messages", "1") - props.put("replica.socket.timeout.ms", "1500") - props - } - - private def getProducerConfig(brokerList: String): Properties = { - val props = new Properties() - props.put("metadata.broker.list", brokerList) - props.put("serializer.class", classOf[StringEncoder].getName) - props - } - private def createTestMessage(topic: String, sent: Map[String, Int]) : Seq[KeyedMessage[String, String]] = { val messages = for ((s, freq) <- sent; i <- 0 until freq) yield { @@ -161,25 +143,26 @@ class KafkaStreamSuite extends TestSuiteBase { } } -object KafkaStreamSuite { +object KafkaTestUtils { val random = new Random() - def createTmpDir(): File = { - val tmp = System.getProperty("java.io.tmpdir") - val f = new File(tmp, "spark-kafka-" + random.nextInt(10000)) - f.mkdirs() - f + def getBrokerConfig(port: Int, zkConnect: String): Properties = { + val props = new Properties() + props.put("broker.id", "0") + props.put("host.name", "localhost") + props.put("port", port.toString) + props.put("log.dir", Utils.createTempDir().getAbsolutePath) + props.put("zookeeper.connect", zkConnect) + props.put("log.flush.interval.messages", "1") + props.put("replica.socket.timeout.ms", "1500") + props } - def deleteDir(file: File) { - if (file.isFile) { - file.delete() - } else { - for (f <- file.listFiles()) { - deleteDir(f) - } - file.delete() - } + def getProducerConfig(brokerList: String): Properties = { + val props = new Properties() + props.put("metadata.broker.list", brokerList) + props.put("serializer.class", classOf[StringEncoder].getName) + props } def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = { @@ -202,25 +185,25 @@ object KafkaStreamSuite { TopicAndPartition(topic, partition))), timeout), s"Partition [$topic, $partition] metadata not propagated after timeout") } -} -class EmbeddedZookeeper(val zkConnect: String) { - val random = new Random() - val snapshotDir = KafkaStreamSuite.createTmpDir() - val logDir = KafkaStreamSuite.createTmpDir() + class EmbeddedZookeeper(val zkConnect: String) { + val random = new Random() + val snapshotDir = Utils.createTempDir() + val logDir = Utils.createTempDir() - val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500) - val(ip, port) = { - val splits = zkConnect.split(":") - (splits(0), splits(1).toInt) - } - val factory = new NIOServerCnxnFactory() - factory.configure(new InetSocketAddress(ip, port), 16) - factory.startup(zookeeper) - - def shutdown() { - factory.shutdown() - KafkaStreamSuite.deleteDir(snapshotDir) - KafkaStreamSuite.deleteDir(logDir) + val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500) + val (ip, port) = { + val splits = zkConnect.split(":") + (splits(0), splits(1).toInt) + } + val factory = new NIOServerCnxnFactory() + factory.configure(new InetSocketAddress(ip, port), 16) + factory.startup(zookeeper) + + def shutdown() { + factory.shutdown() + Utils.deleteRecursively(snapshotDir) + Utils.deleteRecursively(logDir) + } } }