Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17834][SQL]Fetch the earliest offsets manually in KafkaSource instead of counting on KafkaConsumer #15397

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private[kafka010] case class KafkaSource(
executorKafkaParams: ju.Map[String, Object],
sourceOptions: Map[String, String],
metadataPath: String,
startFromEarliestOffset: Boolean,
failOnDataLoss: Boolean)
extends Source with Logging {

Expand Down Expand Up @@ -109,7 +110,11 @@ private[kafka010] case class KafkaSource(
private lazy val initialPartitionOffsets = {
val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
metadataLog.get(0).getOrElse {
val offsets = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = false))
val offsets = if (startFromEarliestOffset) {
KafkaSourceOffset(fetchEarliestOffsets())
} else {
KafkaSourceOffset(fetchLatestOffsets())
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")
offsets
Expand All @@ -123,7 +128,7 @@ private[kafka010] case class KafkaSource(
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets

val offset = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = true))
val offset = KafkaSourceOffset(fetchLatestOffsets())
logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
Some(offset)
}
Expand Down Expand Up @@ -227,26 +232,34 @@ private[kafka010] case class KafkaSource(
override def toString(): String = s"KafkaSource[$consumerStrategy]"

/**
* Fetch the offset of a partition, either seek to the latest offsets or use the current offsets
* in the consumer.
* Fetch the earliest offsets of partitions.
*/
private def fetchPartitionOffsets(
seekToEnd: Boolean): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
private def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
logDebug(s"Partitioned assigned to consumer: $partitions")
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the beginning")

// Get the current or latest offset of each partition
if (seekToEnd) {
consumer.seekToEnd(partitions)
logDebug("Seeked to the end")
}
consumer.seekToBeginning(partitions)
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got earliest offsets for partition : $partitionOffsets")
partitionOffsets
}

/**
* Fetch the latest offset of partitions.
*/
private def fetchLatestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")

consumer.seekToEnd(partitions)
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got offsets for partition : $partitionOffsets")
logDebug(s"Got latest offsets for partition : $partitionOffsets")
partitionOffsets
}

Expand All @@ -256,22 +269,21 @@ private[kafka010] case class KafkaSource(
*/
private def fetchNewPartitionEarliestOffsets(
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to pause all partitions here as well?

consumer.pause(partitions)
logDebug(s"\tPartitioned assigned to consumer: $partitions")

// Get the earliest offset of each partition
consumer.seekToBeginning(partitions)
val partitionToOffsets = newPartitions.filter { p =>
val partitionOffsets = newPartitions.filter { p =>
// When deleting topics happen at the same time, some partitions may not be in `partitions`.
// So we need to ignore them
partitions.contains(p)
}.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got offsets for new partitions: $partitionToOffsets")
partitionToOffsets
logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
partitionOffsets
}

/**
Expand All @@ -284,6 +296,9 @@ private[kafka010] case class KafkaSource(
*/
private def withRetriesWithoutInterrupt(
body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
// Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])

synchronized {
var result: Option[Map[TopicPartition, Long]] = None
var attempt = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,15 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

val autoOffsetResetValue = caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match {
case Some(value) => value.trim() // same values as those supported by auto.offset.reset
case None => "latest"
}
val startFromEarliestOffset =
caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY).map(_.trim.toLowerCase) match {
case Some("latest") => false
case Some("earliest") => true
case Some(pos) =>
// This should not happen since we have already checked the options.
throw new IllegalStateException(s"Invalid $STARTING_OFFSET_OPTION_KEY: $pos")
case None => false
}

val kafkaParamsForStrategy =
ConfigUpdater("source", specifiedKafkaParams)
Expand All @@ -90,8 +95,9 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
// So that consumers in Kafka source do not mess with any existing group id
.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver")

// So that consumers can start from earliest or latest
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetValue)
// Set to "latest" to avoid exceptions. However, KafkaSource will fetch the initial offsets
// by itself instead of counting on KafkaConsumer.
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

// So that consumers in the driver does not commit offsets unnecessarily
.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
Expand Down Expand Up @@ -147,6 +153,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
kafkaParamsForExecutors,
parameters,
metadataPath,
startFromEarliestOffset,
failOnDataLoss)
}

Expand Down