Skip to content

Commit

Permalink
[SPARK-4964] code cleanup per helena
Browse files Browse the repository at this point in the history
  • Loading branch information
koeninger committed Jan 9, 2015
1 parent adf99a6 commit 356c7cc
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
if (result.keys.size == topicAndPartitions.size) {
Right(result)
} else {
val missing = topicAndPartitions.diff(result.keys.toSet)
val missing = topicAndPartitions.diff(result.keySet)
val err = new Err
err.append(new Exception(s"Couldn't find leaders for ${missing}"))
Left(err)
Expand Down Expand Up @@ -192,7 +192,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
return Right(result)
}
}
val missing = topicAndPartitions.diff(result.keys.toSet)
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new Exception(s"Couldn't find leader offsets for ${missing}"))
Left(errs)
}
Expand All @@ -219,7 +219,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
withBrokers(seedBrokers, errs) { consumer =>
val resp = consumer.fetchOffsets(req)
val respMap = resp.requestInfo
val needed = topicAndPartitions.diff(result.keys.toSet)
val needed = topicAndPartitions.diff(result.keySet)
needed.foreach { tp =>
respMap.get(tp).foreach { offsetMeta =>
if (offsetMeta.error == ErrorMapping.NoError) {
Expand All @@ -233,7 +233,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
return Right(result)
}
}
val missing = topicAndPartitions.diff(result.keys.toSet)
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new Exception(s"Couldn't find consumer offsets for ${missing}"))
Left(errs)
}
Expand All @@ -254,11 +254,11 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
var result = Map[TopicAndPartition, Short]()
val req = OffsetCommitRequest(groupId, metadata)
val errs = new Err
val topicAndPartitions = metadata.keys.toSet
val topicAndPartitions = metadata.keySet
withBrokers(seedBrokers, errs) { consumer =>
val resp = consumer.commitOffsets(req)
val respMap = resp.requestInfo
val needed = topicAndPartitions.diff(result.keys.toSet)
val needed = topicAndPartitions.diff(result.keySet)
needed.foreach { tp =>
respMap.get(tp).foreach { err =>
if (err == ErrorMapping.NoError) {
Expand All @@ -272,7 +272,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
return Right(result)
}
}
val missing = topicAndPartitions.diff(result.keys.toSet)
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new Exception(s"Couldn't set offsets for ${missing}"))
Left(errs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.util.NextIterator

import java.util.Properties
import kafka.api.FetchRequestBuilder
import kafka.api.{FetchRequestBuilder, FetchResponse}
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import kafka.message.{MessageAndMetadata, MessageAndOffset}
Expand Down Expand Up @@ -104,6 +104,20 @@ class KafkaRDD[
var requestOffset = part.fromOffset
var iter: Iterator[MessageAndOffset] = null

def handleErr(resp: FetchResponse) {
if (resp.hasError) {
val err = resp.errorCode(part.topic, part.partition)
if (err == ErrorMapping.LeaderNotAvailableCode ||
err == ErrorMapping.NotLeaderForPartitionCode) {
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
Thread.sleep(kc.config.refreshLeaderBackoffMs)
}
// Let normal rdd retry sort out reconnect attempts
throw ErrorMapping.exceptionFor(err)
}
}

override def close() = consumer.close()

override def getNext: R = {
Expand All @@ -112,17 +126,8 @@ class KafkaRDD[
addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes).
build()
val resp = consumer.fetch(req)
if (resp.hasError) {
val err = resp.errorCode(part.topic, part.partition)
if (err == ErrorMapping.LeaderNotAvailableCode ||
err == ErrorMapping.NotLeaderForPartitionCode) {
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
Thread.sleep(kc.config.refreshLeaderBackoffMs)
}
// Let normal rdd retry sort out reconnect attempts
throw ErrorMapping.exceptionFor(err)
}
handleErr(resp)
// kafka may return a batch that starts before the requested offset
iter = resp.messageSet(part.topic, part.partition)
.iterator
.dropWhile(_.offset < requestOffset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class DeterministicKafkaInputDStream[

@tailrec
private def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, Long] = {
val o = kc.getLatestLeaderOffsets(currentOffsets.keys.toSet)
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
// Either.fold would confuse @tailrec, do it manually
if (o.isLeft) {
val err = o.left.get.toString
Expand All @@ -96,8 +96,7 @@ class DeterministicKafkaInputDStream[
private def clamp(leaderOffsets: Map[TopicAndPartition, Long]): Map[TopicAndPartition, Long] = {
maxMessagesPerPartition.map { mmp =>
leaderOffsets.map { kv =>
val (k, v) = kv
k -> Math.min(currentOffsets(k) + mmp, v)
kv._1 -> Math.min(currentOffsets(kv._1) + mmp, kv._2)
}
}.getOrElse(leaderOffsets)
}
Expand Down

0 comments on commit 356c7cc

Please sign in to comment.