Skip to content

Commit

Permalink
[SPARK-17834][SQL] Fetch the earliest offsets manually in KafkaSource…
Browse files Browse the repository at this point in the history
… instead of counting on KafkaConsumer

## What changes were proposed in this pull request?

Because `KafkaConsumer.poll(0)` may update the partition offsets, this PR just calls `seekToBeginning` to manually set the earliest offsets for the KafkaSource initial offsets.

## How was this patch tested?

Existing tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#15397 from zsxwing/SPARK-17834.
  • Loading branch information
zsxwing authored and Robert Kruszewski committed Oct 31, 2016
1 parent 99ec902 commit 75ef850
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private[kafka010] case class KafkaSource(
executorKafkaParams: ju.Map[String, Object],
sourceOptions: Map[String, String],
metadataPath: String,
startFromEarliestOffset: Boolean,
failOnDataLoss: Boolean)
extends Source with Logging {

Expand Down Expand Up @@ -109,7 +110,11 @@ private[kafka010] case class KafkaSource(
private lazy val initialPartitionOffsets = {
val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
metadataLog.get(0).getOrElse {
val offsets = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = false))
val offsets = if (startFromEarliestOffset) {
KafkaSourceOffset(fetchEarliestOffsets())
} else {
KafkaSourceOffset(fetchLatestOffsets())
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")
offsets
Expand All @@ -123,7 +128,7 @@ private[kafka010] case class KafkaSource(
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets

val offset = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = true))
val offset = KafkaSourceOffset(fetchLatestOffsets())
logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
Some(offset)
}
Expand Down Expand Up @@ -227,26 +232,34 @@ private[kafka010] case class KafkaSource(
override def toString(): String = s"KafkaSource[$consumerStrategy]"

/**
* Fetch the offset of a partition, either seek to the latest offsets or use the current offsets
* in the consumer.
* Fetch the earliest offsets of partitions.
*/
private def fetchPartitionOffsets(
seekToEnd: Boolean): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
private def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
logDebug(s"Partitioned assigned to consumer: $partitions")
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the beginning")

// Get the current or latest offset of each partition
if (seekToEnd) {
consumer.seekToEnd(partitions)
logDebug("Seeked to the end")
}
consumer.seekToBeginning(partitions)
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got earliest offsets for partition : $partitionOffsets")
partitionOffsets
}

/**
* Fetch the latest offset of partitions.
*/
private def fetchLatestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")

consumer.seekToEnd(partitions)
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got offsets for partition : $partitionOffsets")
logDebug(s"Got latest offsets for partition : $partitionOffsets")
partitionOffsets
}

Expand All @@ -256,22 +269,21 @@ private[kafka010] case class KafkaSource(
*/
private def fetchNewPartitionEarliestOffsets(
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
logDebug(s"\tPartitioned assigned to consumer: $partitions")

// Get the earliest offset of each partition
consumer.seekToBeginning(partitions)
val partitionToOffsets = newPartitions.filter { p =>
val partitionOffsets = newPartitions.filter { p =>
// When deleting topics happen at the same time, some partitions may not be in `partitions`.
// So we need to ignore them
partitions.contains(p)
}.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got offsets for new partitions: $partitionToOffsets")
partitionToOffsets
logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
partitionOffsets
}

/**
Expand All @@ -284,6 +296,9 @@ private[kafka010] case class KafkaSource(
*/
private def withRetriesWithoutInterrupt(
body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
// Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])

synchronized {
var result: Option[Map[TopicPartition, Long]] = None
var attempt = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,15 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

val autoOffsetResetValue = caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match {
case Some(value) => value.trim() // same values as those supported by auto.offset.reset
case None => "latest"
}
val startFromEarliestOffset =
caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY).map(_.trim.toLowerCase) match {
case Some("latest") => false
case Some("earliest") => true
case Some(pos) =>
// This should not happen since we have already checked the options.
throw new IllegalStateException(s"Invalid $STARTING_OFFSET_OPTION_KEY: $pos")
case None => false
}

val kafkaParamsForStrategy =
ConfigUpdater("source", specifiedKafkaParams)
Expand All @@ -90,8 +95,9 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
// So that consumers in Kafka source do not mess with any existing group id
.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver")

// So that consumers can start from earliest or latest
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetValue)
// Set to "latest" to avoid exceptions. However, KafkaSource will fetch the initial offsets
// by itself instead of counting on KafkaConsumer.
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

// So that consumers in the driver does not commit offsets unnecessarily
.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
Expand Down Expand Up @@ -147,6 +153,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
kafkaParamsForExecutors,
parameters,
metadataPath,
startFromEarliestOffset,
failOnDataLoss)
}

Expand Down

0 comments on commit 75ef850

Please sign in to comment.