diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 1be70db87497e..1b22a5fd19c35 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -82,6 +82,7 @@ private[kafka010] case class KafkaSource( executorKafkaParams: ju.Map[String, Object], sourceOptions: Map[String, String], metadataPath: String, + startFromEarliestOffset: Boolean, failOnDataLoss: Boolean) extends Source with Logging { @@ -109,7 +110,11 @@ private[kafka010] case class KafkaSource( private lazy val initialPartitionOffsets = { val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) metadataLog.get(0).getOrElse { - val offsets = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = false)) + val offsets = if (startFromEarliestOffset) { + KafkaSourceOffset(fetchEarliestOffsets()) + } else { + KafkaSourceOffset(fetchLatestOffsets()) + } metadataLog.add(0, offsets) logInfo(s"Initial offsets: $offsets") offsets @@ -123,7 +128,7 @@ private[kafka010] case class KafkaSource( // Make sure initialPartitionOffsets is initialized initialPartitionOffsets - val offset = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = true)) + val offset = KafkaSourceOffset(fetchLatestOffsets()) logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}") Some(offset) } @@ -227,11 +232,9 @@ private[kafka010] case class KafkaSource( override def toString(): String = s"KafkaSource[$consumerStrategy]" /** - * Fetch the offset of a partition, either seek to the latest offsets or use the current offsets - * in the consumer. + * Fetch the earliest offsets of partitions. */ - private def fetchPartitionOffsets( - seekToEnd: Boolean): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { + private def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894) assert(Thread.currentThread().isInstanceOf[StreamExecutionThread]) // Poll to get the latest assigned partitions @@ -240,11 +243,27 @@ private[kafka010] case class KafkaSource( consumer.pause(partitions) logDebug(s"Partitioned assigned to consumer: $partitions") - // Get the current or latest offset of each partition - if (seekToEnd) { - consumer.seekToEnd(partitions) - logDebug("Seeked to the end") - } + logDebug("Seeked to the beginning") + consumer.seekToBeginning(partitions) + val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap + logDebug(s"Got offsets for partition : $partitionOffsets") + partitionOffsets + } + + /** + * Fetch the latest offset of partitions. + */ + private def fetchLatestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { + // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894) + assert(Thread.currentThread().isInstanceOf[StreamExecutionThread]) + // Poll to get the latest assigned partitions + consumer.poll(0) + val partitions = consumer.assignment() + consumer.pause(partitions) + logDebug(s"Partitioned assigned to consumer: $partitions") + + logDebug("Seeked to the end") + consumer.seekToEnd(partitions) val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap logDebug(s"Got offsets for partition : $partitionOffsets") partitionOffsets diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 1b0a2fe955d03..23b1b60f3bcaa 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -77,10 +77,15 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider // id. Hence, we should generate a unique id for each query. val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - val autoOffsetResetValue = caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match { - case Some(value) => value.trim() // same values as those supported by auto.offset.reset - case None => "latest" - } + val startFromEarliestOffset = + caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY).map(_.trim.toLowerCase) match { + case Some("latest") => false + case Some("earliest") => true + case Some(pos) => + // This should not happen since we have already checked the options. + throw new IllegalStateException(s"Invalid $STARTING_OFFSET_OPTION_KEY: $pos") + case None => false + } val kafkaParamsForStrategy = ConfigUpdater("source", specifiedKafkaParams) @@ -90,8 +95,9 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider // So that consumers in Kafka source do not mess with any existing group id .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver") - // So that consumers can start from earliest or latest - .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetValue) + // Set to "latest" to avoid exceptions. However, KafkaSource will fetch the initial offsets + // by itself instead of counting on KafkaConsumer. + .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") // So that consumers in the driver does not commit offsets unnecessarily .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") @@ -147,6 +153,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider kafkaParamsForExecutors, parameters, metadataPath, + startFromEarliestOffset, failOnDataLoss) }