diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index f67924f8863a0..194fcafd6bd93 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -233,9 +233,9 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def getConsumerOffsets( groupId: String, topicAndPartitions: Set[TopicAndPartition], - versionId: Short + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, Long]] = { - getConsumerOffsetMetadata(groupId, topicAndPartitions, versionId).right.map { r => + getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r => r.map { kv => kv._1 -> kv._2.offset } @@ -252,10 +252,10 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def getConsumerOffsetMetadata( groupId: String, topicAndPartitions: Set[TopicAndPartition], - versionId: Short + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = { var result = Map[TopicAndPartition, OffsetMetadataAndError]() - val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, versionId) + val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion) val errs = new Err withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => val resp = consumer.fetchOffsets(req) @@ -289,12 +289,12 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def setConsumerOffsets( groupId: String, offsets: Map[TopicAndPartition, Long], - versionId: Short + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, Short]] = { val meta = offsets.map { kv => kv._1 -> OffsetAndMetadata(kv._2) } - setConsumerOffsetMetadata(groupId, meta, versionId) + setConsumerOffsetMetadata(groupId, meta, consumerApiVersion) } /** Requires Kafka >= 0.8.1.1 */ @@ -307,10 +307,10 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { def setConsumerOffsetMetadata( groupId: String, metadata: Map[TopicAndPartition, OffsetAndMetadata], - versionId: Short + consumerApiVersion: Short ): Either[Err, Map[TopicAndPartition, Short]] = { var result = Map[TopicAndPartition, Short]() - val req = OffsetCommitRequest(groupId, metadata, versionId) + val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion) val errs = new Err val topicAndPartitions = metadata.keySet withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 58774ee506eba..104f772fd76bb 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -229,6 +229,7 @@ private class KafkaTestUtils extends Logging { tryAgain(1) } + /** wait until the leader offset for the given topic / partition equals the specified offset */ def waitUntilLeaderOffset( kc: KafkaCluster, topic: String,