Skip to content


[SPARK-23623][SS] Avoid concurrent use of cached consumers in CachedK…
Browse files Browse the repository at this point in the history

## What changes were proposed in this pull request?

CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly.

Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to ConcurrentModificationException ,or worse, silent reading of incorrect data.

Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag inuse is not checked when returning a cached consumer, see [this]( If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. Then the planner does not have to have a flag to avoid reuses.

This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply called `val consumer = KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is a concurrent attempt of the same task, then a new consumer is generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a misnomer given that what it returns may or may not be cached.

This PR does not remove the planner flag to avoid reuse to make this patch safe enough for merging in branch-2.3. This can be done later in master-only.

## How was this patch tested?
A new stress test that verifies it is safe to concurrently get consumers for the same partition from the consumer pool.

Author: Tathagata Das <>

Closes apache#20767 from tdas/SPARK-23623.
  • Loading branch information
tdas authored and mstewart141 committed Mar 24, 2018
1 parent 7e6a978 commit 43d5f0f
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ class KafkaContinuousDataReader(
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
private val consumer =
CachedKafkaConsumer.createUncached(topicPartition.topic, topicPartition.partition, kafkaParams)
private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
private val converter = new KafkaRecordToUnsafeRowConverter

private var nextKafkaOffset = startOffset
Expand Down Expand Up @@ -245,6 +244,6 @@ class KafkaContinuousDataReader(

override def close(): Unit = {
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,73 @@ import org.apache.kafka.common.TopicPartition

import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread

private[kafka010] sealed trait KafkaDataConsumer {
* Get the record for the given offset if available. Otherwise it will either throw error
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset),
* or null.
* @param offset the offset to fetch.
* @param untilOffset the max offset to fetch. Exclusive.
* @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
* @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at
* offset if available, or throw exception.when `failOnDataLoss` is `false`,
* this method will either return record at offset if available, or return
* the next earliest available record less than untilOffset, or null. It
* will not throw any exception.
def get(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
internalConsumer.get(offset, untilOffset, pollTimeoutMs, failOnDataLoss)

* Return the available offset range of the current partition. It's a pair of the earliest offset
* and the latest offset.
def getAvailableOffsetRange(): AvailableOffsetRange = internalConsumer.getAvailableOffsetRange()

* Release this consumer from being further used. Depending on its implementation,
* this consumer will be either finalized, or reset for reuse later.
def release(): Unit

/** Reference to the internal implementation that this wrapper delegates to */
protected def internalConsumer: InternalKafkaConsumer

* Consumer of single topicpartition, intended for cached reuse.
* Underlying consumer is not threadsafe, so neither is this,
* but processing the same topicpartition and group id in multiple threads is usually bad anyway.
* A wrapper around Kafka's KafkaConsumer that throws error when data loss is detected.
* This is not for direct use outside this file.
private[kafka010] case class CachedKafkaConsumer private(
private[kafka010] case class InternalKafkaConsumer(
topicPartition: TopicPartition,
kafkaParams: ju.Map[String, Object]) extends Logging {
import CachedKafkaConsumer._
import InternalKafkaConsumer._

private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

private var consumer = createConsumer
@volatile private var consumer = createConsumer

/** indicates whether this consumer is in use or not */
private var inuse = true
@volatile var inUse = true

/** indicate whether this consumer is going to be stopped in the next release */
@volatile var markedForClose = false

/** Iterator to the already fetch data */
private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
private var nextOffsetInFetchedData = UNKNOWN_OFFSET
@volatile private var fetchedData =
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
@volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET

/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
Expand All @@ -61,8 +104,6 @@ private[kafka010] case class CachedKafkaConsumer private(

case class AvailableOffsetRange(earliest: Long, latest: Long)

private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match {
case ut: UninterruptibleThread =>
Expand Down Expand Up @@ -313,21 +354,51 @@ private[kafka010] case class CachedKafkaConsumer private(

private[kafka010] object CachedKafkaConsumer extends Logging {

private val UNKNOWN_OFFSET = -2L
private[kafka010] object KafkaDataConsumer extends Logging {

case class AvailableOffsetRange(earliest: Long, latest: Long)

private case class CachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer)
extends KafkaDataConsumer {
assert(internalConsumer.inUse) // make sure this has been set to true
override def release(): Unit = { KafkaDataConsumer.release(internalConsumer) }

private case class NonCachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer)
extends KafkaDataConsumer {
override def release(): Unit = { internalConsumer.close() }

private case class CacheKey(groupId: String, topicPartition: TopicPartition)
private case class CacheKey(groupId: String, topicPartition: TopicPartition) {
def this(topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object]) =
this(kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String], topicPartition)

// This cache has the following important properties.
// - We make a best-effort attempt to maintain the max size of the cache as configured capacity.
// The capacity is not guaranteed to be maintained, especially when there are more active
// tasks simultaneously using consumers than the capacity.
private lazy val cache = {
val conf = SparkEnv.get.conf
val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) {
new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, true) {
override def removeEldestEntry(
entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = {
if (entry.getValue.inuse == false && this.size > capacity) {
logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " +
s"removing consumer for ${entry.getKey}")
entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer]): Boolean = {

// Try to remove the least-used entry if its currently not in use.
// If you cannot remove it, then the cache will keep growing. In the worst case,
// the cache will grow to the max number of concurrent tasks that can run in the executor,
// (that is, number of tasks slots) after which it will never reduce. This is unlikely to
// be a serious problem because an executor with more than 64 (default) tasks slots is
// likely running on a beefy machine that can handle a large number of simultaneously
// active consumers.

if (entry.getValue.inUse == false && this.size > capacity) {
s"KafkaConsumer cache hitting max capacity of $capacity, " +
s"removing consumer for ${entry.getKey}")
try {
} catch {
Expand All @@ -342,80 +413,87 @@ private[kafka010] object CachedKafkaConsumer extends Logging {

def releaseKafkaConsumer(
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): Unit = {
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
val topicPartition = new TopicPartition(topic, partition)
val key = CacheKey(groupId, topicPartition)

synchronized {
val consumer = cache.get(key)
if (consumer != null) {
consumer.inuse = false
} else {
logWarning(s"Attempting to release consumer that does not exist")

* Removes (and closes) the Kafka Consumer for the given topic, partition and group id.
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using kafkaParams.
* The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]].
* Note: This method guarantees that the consumer returned is not currently in use by any one
* else. Within this guarantee, this method will make a best effort attempt to re-use consumers by
* caching them and tracking when they are in use.
def removeKafkaConsumer(
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): Unit = {
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
val topicPartition = new TopicPartition(topic, partition)
val key = CacheKey(groupId, topicPartition)
def acquire(
topicPartition: TopicPartition,
kafkaParams: ju.Map[String, Object],
useCache: Boolean): KafkaDataConsumer = synchronized {
val key = new CacheKey(topicPartition, kafkaParams)
val existingInternalConsumer = cache.get(key)

synchronized {
val removedConsumer = cache.remove(key)
if (removedConsumer != null) {
lazy val newInternalConsumer = new InternalKafkaConsumer(topicPartition, kafkaParams)

if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
// If this is reattempt at running the task, then invalidate cached consumer if any and
// start with a new one.
if (existingInternalConsumer != null) {
// Consumer exists in cache. If its in use, mark it for closing later, or close it now.
if (existingInternalConsumer.inUse) {
existingInternalConsumer.markedForClose = true
} else {
cache.remove(key) // Invalidate the cache in any case

} else if (!useCache) {
// If planner asks to not reuse consumers, then do not use it, return a new consumer

} else if (existingInternalConsumer == null) {
// If consumer is not already cached, then put a new in the cache and return it
cache.put(key, newInternalConsumer)
newInternalConsumer.inUse = true

} else if (existingInternalConsumer.inUse) {
// If consumer is already cached but is currently in use, then return a new consumer

} else {
// If consumer is already cached and is currently not in use, then return that consumer
existingInternalConsumer.inUse = true

* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using kafkaParams.
def getOrCreate(
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized {
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
val topicPartition = new TopicPartition(topic, partition)
val key = CacheKey(groupId, topicPartition)

// If this is reattempt at running the task, then invalidate cache and start with
// a new consumer
if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
removeKafkaConsumer(topic, partition, kafkaParams)
val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
consumer.inuse = true
cache.put(key, consumer)
} else {
if (!cache.containsKey(key)) {
cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams))
private def release(intConsumer: InternalKafkaConsumer): Unit = {
synchronized {

// Clear the consumer from the cache if this is indeed the consumer present in the cache
val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams)
val cachedIntConsumer = cache.get(key)
if (intConsumer.eq(cachedIntConsumer)) {
// The released consumer is the same object as the cached one.
if (intConsumer.markedForClose) {
} else {
intConsumer.inUse = false
} else {
// The released consumer is either not the same one as in the cache, or not in the cache
// at all. This may happen if the cache was invalidate while this consumer was being used.
// Just close this consumer.
logInfo(s"Released a supposedly cached consumer that was not found in the cache")
val consumer = cache.get(key)
consumer.inuse = true

/** Create an [[CachedKafkaConsumer]] but don't put it into cache. */
def createUncached(
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = {
new CachedKafkaConsumer(new TopicPartition(topic, partition), kafkaParams)
private[kafka010] object InternalKafkaConsumer extends Logging {

private val UNKNOWN_OFFSET = -2L

private def reportDataLoss0(
failOnDataLoss: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,17 +321,8 @@ private[kafka010] case class KafkaMicroBatchDataReader(
failOnDataLoss: Boolean,
reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with Logging {

private val consumer = {
if (!reuseKafkaConsumer) {
// If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. We
// uses `assign` here, hence we don't need to worry about the "" conflicts.
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
} else {
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)
private val consumer = KafkaDataConsumer.acquire(
offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)

private val rangeToRead = resolveRange(offsetRange)
private val converter = new KafkaRecordToUnsafeRowConverter
Expand Down Expand Up @@ -360,14 +351,7 @@ private[kafka010] case class KafkaMicroBatchDataReader(

override def close(): Unit = {
if (!reuseKafkaConsumer) {
// Don't forget to close non-reuse KafkaConsumers. You may take down your cluster!
} else {
// Indicate that we're no longer using this consumer
offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams)

private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
Expand Down

0 comments on commit 43d5f0f

Please sign in to comment.