Skip to content

Commit

Permalink
[SPARK-2808][Streaming][Kafka] naming / comments per tdas
Browse files Browse the repository at this point in the history
  • Loading branch information
koeninger committed Apr 29, 2015
1 parent 61b3464 commit 3824ce3
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition],
versionId: Short
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Long]] = {
getConsumerOffsetMetadata(groupId, topicAndPartitions, versionId).right.map { r =>
getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r =>
r.map { kv =>
kv._1 -> kv._2.offset
}
Expand All @@ -252,10 +252,10 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition],
versionId: Short
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
var result = Map[TopicAndPartition, OffsetMetadataAndError]()
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, versionId)
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion)
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.fetchOffsets(req)
Expand Down Expand Up @@ -289,12 +289,12 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long],
versionId: Short
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Short]] = {
val meta = offsets.map { kv =>
kv._1 -> OffsetAndMetadata(kv._2)
}
setConsumerOffsetMetadata(groupId, meta, versionId)
setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
}

/** Requires Kafka >= 0.8.1.1 */
Expand All @@ -307,10 +307,10 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
def setConsumerOffsetMetadata(
groupId: String,
metadata: Map[TopicAndPartition, OffsetAndMetadata],
versionId: Short
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Short]] = {
var result = Map[TopicAndPartition, Short]()
val req = OffsetCommitRequest(groupId, metadata, versionId)
val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
val errs = new Err
val topicAndPartitions = metadata.keySet
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ private class KafkaTestUtils extends Logging {
tryAgain(1)
}

/** wait until the leader offset for the given topic / partition equals the specified offset */
def waitUntilLeaderOffset(
kc: KafkaCluster,
topic: String,
Expand Down

0 comments on commit 3824ce3

Please sign in to comment.