From c1bd6d9d78802607e1c23b0dcfe98ddfc0b71c08 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Wed, 14 Jan 2015 14:07:18 -0600 Subject: [PATCH] [SPARK-4964] use newly available attemptNumber for correct retry behavior --- .../src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala index 31601538435a6..59efd1ac84da7 100644 --- a/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala +++ b/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala @@ -88,11 +88,10 @@ class KafkaRDD[ var requestOffset = part.fromOffset var iter: Iterator[MessageAndOffset] = null - // TODO broken until SPARK-4014 is resolved and attemptId / attemptNumber is meaningful. // The idea is to use the provided preferred host, except on task retry atttempts, // to minimize number of kafka metadata requests private def connectLeader: SimpleConsumer = { - if (context.attemptId > 0) { + if (context.attemptNumber > 0) { kc.connectLeader(part.topic, part.partition).fold( errs => throw new Exception( s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +