Skip to content

Commit

Permalink
Fetch the earliest offsets manually in KafkaSource instead of countin…
Browse files Browse the repository at this point in the history
…g on KafkaConsumer
  • Loading branch information
zsxwing committed Oct 7, 2016
1 parent 9d8ae85 commit 95a0c96
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private[kafka010] case class KafkaSource(
executorKafkaParams: ju.Map[String, Object],
sourceOptions: Map[String, String],
metadataPath: String,
startFromEarliestOffset: Boolean,
failOnDataLoss: Boolean)
extends Source with Logging {

Expand Down Expand Up @@ -109,7 +110,11 @@ private[kafka010] case class KafkaSource(
private lazy val initialPartitionOffsets = {
val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
metadataLog.get(0).getOrElse {
val offsets = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = false))
val offsets = if (startFromEarliestOffset) {
KafkaSourceOffset(fetchEarliestOffsets())
} else {
KafkaSourceOffset(fetchLatestOffsets())
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")
offsets
Expand All @@ -123,7 +128,7 @@ private[kafka010] case class KafkaSource(
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets

val offset = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = true))
val offset = KafkaSourceOffset(fetchLatestOffsets())
logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
Some(offset)
}
Expand Down Expand Up @@ -227,11 +232,9 @@ private[kafka010] case class KafkaSource(
override def toString(): String = s"KafkaSource[$consumerStrategy]"

/**
* Fetch the offset of a partition, either seek to the latest offsets or use the current offsets
* in the consumer.
* Fetch the earliest offsets of partitions.
*/
private def fetchPartitionOffsets(
seekToEnd: Boolean): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
private def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
// Poll to get the latest assigned partitions
Expand All @@ -240,11 +243,27 @@ private[kafka010] case class KafkaSource(
consumer.pause(partitions)
logDebug(s"Partitioned assigned to consumer: $partitions")

// Get the current or latest offset of each partition
if (seekToEnd) {
consumer.seekToEnd(partitions)
logDebug("Seeked to the end")
}
logDebug("Seeked to the beginning")
consumer.seekToBeginning(partitions)
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got offsets for partition : $partitionOffsets")
partitionOffsets
}

/**
* Fetch the latest offset of partitions.
*/
private def fetchLatestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
logDebug(s"Partitioned assigned to consumer: $partitions")

logDebug("Seeked to the end")
consumer.seekToEnd(partitions)
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got offsets for partition : $partitionOffsets")
partitionOffsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,15 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

val autoOffsetResetValue = caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match {
case Some(value) => value.trim() // same values as those supported by auto.offset.reset
case None => "latest"
}
val startFromEarliestOffset =
caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY).map(_.trim.toLowerCase) match {
case Some("latest") => false
case Some("earliest") => true
case Some(pos) =>
// This should not happen since we have already checked the options.
throw new IllegalStateException(s"Invalid $STARTING_OFFSET_OPTION_KEY: $pos")
case None => false
}

val kafkaParamsForStrategy =
ConfigUpdater("source", specifiedKafkaParams)
Expand All @@ -90,8 +95,9 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
// So that consumers in Kafka source do not mess with any existing group id
.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver")

// So that consumers can start from earliest or latest
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetValue)
// Set to "latest" to avoid exceptions. However, KafkaSource will fetch the initial offsets
// by itself instead of counting on KafkaConsumer.
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

// So that consumers in the driver does not commit offsets unnecessarily
.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
Expand Down Expand Up @@ -147,6 +153,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
kafkaParamsForExecutors,
parameters,
metadataPath,
startFromEarliestOffset,
failOnDataLoss)
}

Expand Down

0 comments on commit 95a0c96

Please sign in to comment.