From 3824ce390721c32610e6da90a5c08d4f4b68ad32 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Wed, 29 Apr 2015 09:39:09 -0500 Subject: [PATCH] [SPARK-2808][Streaming][Kafka] naming / comments per tdas --- .../spark/streaming/kafka/KafkaCluster.scala | 16 ++++++++-------- .../spark/streaming/kafka/KafkaTestUtils.scala | 1 + 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index f67924f8863a0..194fcafd6bd93 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -233,9 +233,9 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def getConsumerOffsets( groupId: String, topicAndPartitions: Set[TopicAndPartition], - versionId: Short + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, Long]] = { - getConsumerOffsetMetadata(groupId, topicAndPartitions, versionId).right.map { r => + getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r => r.map { kv => kv._1 -> kv._2.offset } @@ -252,10 +252,10 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def getConsumerOffsetMetadata( groupId: String, topicAndPartitions: Set[TopicAndPartition], - versionId: Short + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = { var result = Map[TopicAndPartition, OffsetMetadataAndError]() - val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, versionId) + val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion) val errs = new Err withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => val resp = consumer.fetchOffsets(req) @@ -289,12 +289,12 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def setConsumerOffsets( groupId: String, offsets: Map[TopicAndPartition, Long], - versionId: Short + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, Short]] = { val meta = offsets.map { kv => kv._1 -> OffsetAndMetadata(kv._2) } - setConsumerOffsetMetadata(groupId, meta, versionId) + setConsumerOffsetMetadata(groupId, meta, consumerApiVersion) } /** Requires Kafka >= 0.8.1.1 */ @@ -307,10 +307,10 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def setConsumerOffsetMetadata( groupId: String, metadata: Map[TopicAndPartition, OffsetAndMetadata], - versionId: Short + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, Short]] = { var result = Map[TopicAndPartition, Short]() - val req = OffsetCommitRequest(groupId, metadata, versionId) + val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion) val errs = new Err val topicAndPartitions = metadata.keySet withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 58774ee506eba..104f772fd76bb 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -229,6 +229,7 @@ private class KafkaTestUtils extends Logging { tryAgain(1) } + /** wait until the leader offset for the given topic / partition equals the specified offset */ def waitUntilLeaderOffset( kc: KafkaCluster, topic: String,