Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23623] [SS] Avoid concurrent use of cached consumers in CachedKafkaConsumer #20767

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ class KafkaContinuousDataReader(
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
private val consumer =
CachedKafkaConsumer.createUncached(topicPartition.topic, topicPartition.partition, kafkaParams)
private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
private val converter = new KafkaRecordToUnsafeRowConverter

private var nextKafkaOffset = startOffset
Expand Down Expand Up @@ -236,6 +235,6 @@ class KafkaContinuousDataReader(
}

override def close(): Unit = {
consumer.close()
consumer.release()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,68 @@ import org.apache.kafka.common.TopicPartition

import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread

private[kafka010] sealed trait KafkaDataConsumer {
/**
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
* or null.
*
* @param offset the offset to fetch.
* @param untilOffset the max offset to fetch. Exclusive.
* @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
* @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at
* offset if available, or throw exception.when `failOnDataLoss` is `false`,
* this method will either return record at offset if available, or return
* the next earliest available record less than untilOffset, or null. It
* will not throw any exception.
*/
def get(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
internalConsumer.get(offset, untilOffset, pollTimeoutMs, failOnDataLoss)
}

/**
* Return the available offset range of the current partition. It's a pair of the earliest offset
* and the latest offset.
*/
def getAvailableOffsetRange(): AvailableOffsetRange = internalConsumer.getAvailableOffsetRange()

/**
* Release this consumer from being further used. Depending on its implementation,
* this consumer will be either finalized, or reset for reuse later.
*/
def release(): Unit

/** Reference to the internal implementation that this wrapper delegates to */
protected def internalConsumer: InternalKafkaConsumer
}


/**
* Consumer of single topicpartition, intended for cached reuse.
* Underlying consumer is not threadsafe, so neither is this,
* but processing the same topicpartition and group id in multiple threads is usually bad anyway.
* A wrapper around Kafka's KafkaConsumer that throws error when data loss is detected.
* This is not for direct use outside this file.
*/
private[kafka010] case class CachedKafkaConsumer private(
private[kafka010] case class InternalKafkaConsumer(
topicPartition: TopicPartition,
kafkaParams: ju.Map[String, Object]) extends Logging {
import CachedKafkaConsumer._
import InternalKafkaConsumer._

private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

private var consumer = createConsumer

/** indicates whether this consumer is in use or not */
private var inuse = true
@volatile var inuse = true

/** indicate whether this consumer is going to be stopped in the next release */
@volatile var markedForClose = false

/** Iterator to the already fetch data */
private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
Expand All @@ -61,8 +103,6 @@ private[kafka010] case class CachedKafkaConsumer private(
c
}

case class AvailableOffsetRange(earliest: Long, latest: Long)

private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match {
case ut: UninterruptibleThread =>
ut.runUninterruptibly(body)
Expand Down Expand Up @@ -313,21 +353,40 @@ private[kafka010] case class CachedKafkaConsumer private(
}
}

private[kafka010] object CachedKafkaConsumer extends Logging {

private val UNKNOWN_OFFSET = -2L
private[kafka010] object KafkaDataConsumer extends Logging {

case class AvailableOffsetRange(earliest: Long, latest: Long)

private case class CacheKey(groupId: String, topicPartition: TopicPartition)
private class CachedKafkaDataConsumer(val internalConsumer: InternalKafkaConsumer)
extends KafkaDataConsumer {
override def release(): Unit = {
releaseKafkaConsumer(internalConsumer.topicPartition, internalConsumer.kafkaParams)
}
}

private class NonCachedKafkaDataConsumer(val internalConsumer: InternalKafkaConsumer)
extends KafkaDataConsumer {
override def release(): Unit = {
internalConsumer.close()
}
}

private case class CacheKey(groupId: String, topicPartition: TopicPartition) {
def this(topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object]) =
this(kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String], topicPartition)
}

private lazy val cache = {
val conf = SparkEnv.get.conf
val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) {
new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, true) {
override def removeEldestEntry(
entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer]): Boolean = {
if (entry.getValue.inuse == false && this.size > capacity) {
logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " +
s"removing consumer for ${entry.getKey}")
logWarning(
s"KafkaConsumer cache hitting max capacity of $capacity, " +
s"removing consumer for ${entry.getKey}")
try {
entry.getValue.close()
} catch {
Expand All @@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends Logging {
}
}

def releaseKafkaConsumer(
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): Unit = {
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
val topicPartition = new TopicPartition(topic, partition)
val key = CacheKey(groupId, topicPartition)
private def releaseKafkaConsumer(
topicPartition: TopicPartition,
kafkaParams: ju.Map[String, Object]): Unit = {
val key = new CacheKey(topicPartition, kafkaParams)

synchronized {
val consumer = cache.get(key)
if (consumer != null) {
consumer.inuse = false
if (consumer.markedForClose) {
consumer.close()
cache.remove(key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove it only when it's closed.

} else {
consumer.inuse = false
}
} else {
logWarning(s"Attempting to release consumer that does not exist")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case that a consumer may be evicted because of the max capacity. In this case, we should close the internal consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah. The warning was misleading. Will add comments to clarify that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be the case. We do not remove any consumer from the cache while it is being used. So the scenario that you mentioned should not happen.

}
}
}

/**
* Removes (and closes) the Kafka Consumer for the given topic, partition and group id.
*/
def removeKafkaConsumer(
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): Unit = {
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
val topicPartition = new TopicPartition(topic, partition)
val key = CacheKey(groupId, topicPartition)

synchronized {
val removedConsumer = cache.remove(key)
if (removedConsumer != null) {
removedConsumer.close()
}
}
}

/**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using kafkaParams.
*/
def getOrCreate(
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized {
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
val topicPartition = new TopicPartition(topic, partition)
val key = CacheKey(groupId, topicPartition)

// If this is reattempt at running the task, then invalidate cache and start with
// a new consumer
def acquire(
topicPartition: TopicPartition,
kafkaParams: ju.Map[String, Object],
useCache: Boolean): KafkaDataConsumer = synchronized {
val key = new CacheKey(topicPartition, kafkaParams)
val existingInternalConsumer = cache.get(key)

lazy val newNonCachedConsumer =
new NonCachedKafkaDataConsumer(new InternalKafkaConsumer(topicPartition, kafkaParams))

if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
removeKafkaConsumer(topic, partition, kafkaParams)
val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
consumer.inuse = true
cache.put(key, consumer)
consumer
} else {
if (!cache.containsKey(key)) {
cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams))
// If this is reattempt at running the task, then invalidate cache and start with
// a new consumer
if (existingInternalConsumer != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic here seems wrong. I think it should be something like this?

      if (existingInternalConsumer != null) {
        if (existingInternalConsumer.inuse) {
          existingInternalConsumer.markedForClose = true
          newNonCachedConsumer
        } else {
          existingInternalConsumer.close()
          cache.put(key, newNonCachedConsumer.internalConsumer)
          new CachedKafkaDataConsumer(newNonCachedConsumer.internalConsumer)
        }
      } else {
        cache.put(key, newNonCachedConsumer.internalConsumer)
        new CachedKafkaDataConsumer(newNonCachedConsumer.internalConsumer)
      }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed better. What I was doing was always deferring to a later point. But that would lead to it being used one more time before being closed.

existingInternalConsumer.markedForClose = true
}
val consumer = cache.get(key)
consumer.inuse = true
consumer
newNonCachedConsumer
} else if (!useCache) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this if should be moved before the above if.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? I am saying that we should NOT reuse consumers for ANY task retries, independent of the desire to use the cache or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When useCache is false, i would expect newInternalConsumer should never be put into the cache. The above if may put newInternalConsumer into the cache. If we put a consumer used by a continuous processing query into the cache and assume it never ends, it will prevent other micro batch queries from putting a consumer reading the same topic partition into the cache.

Copy link
Contributor Author

@tdas tdas Mar 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically that wont happens because the continuous query and the batch query will have different groupids. But I agree that if useCache is false, then we should not put it in the cache in any way. In fact, we can simplify the task retry case further by never putting the new one in the cache, only invalidate the existing cached one. The only scenario whether this will hurt a little would be the micro-batch immediately after the reattempt will create a new consumer. Thats a tiny one time cost in a scenario whether the reattempt has already made is slightly costly.

newNonCachedConsumer
} else if (existingInternalConsumer == null) {
newNonCachedConsumer.internalConsumer.inuse = true
cache.put(key, newNonCachedConsumer.internalConsumer)
newNonCachedConsumer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return a CachedKafkaDataConsumer in this case. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yes. damn it. my bad.

} else if (existingInternalConsumer.inuse) {
newNonCachedConsumer
} else {
new CachedKafkaDataConsumer(existingInternalConsumer)
}
}
}

/** Create an [[CachedKafkaConsumer]] but don't put it into cache. */
def createUncached(
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = {
new CachedKafkaConsumer(new TopicPartition(topic, partition), kafkaParams)
}
private[kafka010] object InternalKafkaConsumer extends Logging {

private val UNKNOWN_OFFSET = -2L

private def reportDataLoss0(
failOnDataLoss: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,17 +321,8 @@ private[kafka010] case class KafkaMicroBatchDataReader(
failOnDataLoss: Boolean,
reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with Logging {

private val consumer = {
if (!reuseKafkaConsumer) {
// If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. We
// uses `assign` here, hence we don't need to worry about the "group.id" conflicts.
CachedKafkaConsumer.createUncached(
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
} else {
CachedKafkaConsumer.getOrCreate(
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
}
}
private val consumer = KafkaDataConsumer.acquire(
offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)

private val rangeToRead = resolveRange(offsetRange)
private val converter = new KafkaRecordToUnsafeRowConverter
Expand Down Expand Up @@ -360,14 +351,7 @@ private[kafka010] case class KafkaMicroBatchDataReader(
}

override def close(): Unit = {
if (!reuseKafkaConsumer) {
// Don't forget to close non-reuse KafkaConsumers. You may take down your cluster!
consumer.close()
} else {
// Indicate that we're no longer using this consumer
CachedKafkaConsumer.releaseKafkaConsumer(
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
}
consumer.release()
}

private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[kafka010] case class KafkaSourceRDDPartition(
* An RDD that reads data from Kafka based on offset ranges across multiple partitions.
* Additionally, it allows preferred locations to be set for each topic + partition, so that
* the [[KafkaSource]] can ensure the same executor always reads the same topic + partition
* and cached KafkaConsumers (see [[CachedKafkaConsumer]] can be used read data efficiently.
* and cached KafkaConsumers (see [[KafkaDataConsumer]] can be used read data efficiently.
*
* @param sc the [[SparkContext]]
* @param executorKafkaParams Kafka configuration for creating KafkaConsumer on the executors
Expand Down Expand Up @@ -126,14 +126,9 @@ private[kafka010] class KafkaSourceRDD(
val sourcePartition = thePart.asInstanceOf[KafkaSourceRDDPartition]
val topic = sourcePartition.offsetRange.topic
val kafkaPartition = sourcePartition.offsetRange.partition
val consumer =
if (!reuseKafkaConsumer) {
// If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. As here we
// uses `assign`, we don't need to worry about the "group.id" conflicts.
CachedKafkaConsumer.createUncached(topic, kafkaPartition, executorKafkaParams)
} else {
CachedKafkaConsumer.getOrCreate(topic, kafkaPartition, executorKafkaParams)
}
val consumer = KafkaDataConsumer.acquire(
sourcePartition.offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)

val range = resolveRange(consumer, sourcePartition.offsetRange)
assert(
range.fromOffset <= range.untilOffset,
Expand Down Expand Up @@ -167,13 +162,7 @@ private[kafka010] class KafkaSourceRDD(
}

override protected def close(): Unit = {
if (!reuseKafkaConsumer) {
// Don't forget to close non-reuse KafkaConsumers. You may take down your cluster!
consumer.close()
} else {
// Indicate that we're no longer using this consumer
CachedKafkaConsumer.releaseKafkaConsumer(topic, kafkaPartition, executorKafkaParams)
}
consumer.release()
}
}
// Release consumer, either by removing it or indicating we're no longer using it
Expand All @@ -184,7 +173,7 @@ private[kafka010] class KafkaSourceRDD(
}
}

private def resolveRange(consumer: CachedKafkaConsumer, range: KafkaSourceRDDOffsetRange) = {
private def resolveRange(consumer: KafkaDataConsumer, range: KafkaSourceRDDOffsetRange) = {
if (range.fromOffset < 0 || range.untilOffset < 0) {
// Late bind the offset range
val availableOffsetRange = consumer.getAvailableOffsetRange()
Expand Down

This file was deleted.

Loading