Skip to content

Commit

Permalink
[SPARK-4964] add comments per pwendell / dibbhatt
Browse files Browse the repository at this point in the history
  • Loading branch information
koeninger committed Feb 3, 2015
1 parent 8991017 commit 0df3ebe
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
// scalastyle:on

/** Requires Kafka >= 0.8.1.1 */
def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
Expand All @@ -232,6 +233,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
}
}

/** Requires Kafka >= 0.8.1.1 */
def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
Expand Down Expand Up @@ -261,6 +263,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
Left(errs)
}

/** Requires Kafka >= 0.8.1.1 */
def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long]
Expand All @@ -270,6 +273,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
})
}

/** Requires Kafka >= 0.8.1.1 */
def setConsumerOffsetMetadata(
groupId: String,
metadata: Map[TopicAndPartition, OffsetMetadataAndError]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ final class OffsetRange private(
}

object OffsetRange {
private[spark]
type OffsetRangeTuple = (String, Int, Long, Long)

def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
new OffsetRange(topic, partition, fromOffset, untilOffset)

Expand All @@ -63,6 +60,10 @@ object OffsetRange {
untilOffset: Long): OffsetRange =
new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)

/** this is to avoid ClassNotFoundException during checkpoint restore */
private[spark]
type OffsetRangeTuple = (String, Int, Long, Long)

private[streaming]
def apply(t: OffsetRangeTuple) =
new OffsetRange(t._1, t._2, t._3, t._4)
Expand Down

0 comments on commit 0df3ebe

Please sign in to comment.