-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23623] [SS] Avoid concurrent use of cached consumers in CachedKafkaConsumer #20767
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,30 +27,73 @@ import org.apache.kafka.common.TopicPartition | |
|
||
import org.apache.spark.{SparkEnv, SparkException, TaskContext} | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange | ||
import org.apache.spark.sql.kafka010.KafkaSourceProvider._ | ||
import org.apache.spark.util.UninterruptibleThread | ||
|
||
private[kafka010] sealed trait KafkaDataConsumer { | ||
/** | ||
* Get the record for the given offset if available. Otherwise it will either throw error | ||
* (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), | ||
* or null. | ||
* | ||
* @param offset the offset to fetch. | ||
* @param untilOffset the max offset to fetch. Exclusive. | ||
* @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. | ||
* @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at | ||
* offset if available, or throw exception.when `failOnDataLoss` is `false`, | ||
* this method will either return record at offset if available, or return | ||
* the next earliest available record less than untilOffset, or null. It | ||
* will not throw any exception. | ||
*/ | ||
def get( | ||
offset: Long, | ||
untilOffset: Long, | ||
pollTimeoutMs: Long, | ||
failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { | ||
internalConsumer.get(offset, untilOffset, pollTimeoutMs, failOnDataLoss) | ||
} | ||
|
||
/** | ||
* Return the available offset range of the current partition. It's a pair of the earliest offset | ||
* and the latest offset. | ||
*/ | ||
def getAvailableOffsetRange(): AvailableOffsetRange = internalConsumer.getAvailableOffsetRange() | ||
|
||
/** | ||
* Release this consumer from being further used. Depending on its implementation, | ||
* this consumer will be either finalized, or reset for reuse later. | ||
*/ | ||
def release(): Unit | ||
|
||
/** Reference to the internal implementation that this wrapper delegates to */ | ||
protected def internalConsumer: InternalKafkaConsumer | ||
} | ||
|
||
|
||
/** | ||
* Consumer of single topicpartition, intended for cached reuse. | ||
* Underlying consumer is not threadsafe, so neither is this, | ||
* but processing the same topicpartition and group id in multiple threads is usually bad anyway. | ||
* A wrapper around Kafka's KafkaConsumer that throws error when data loss is detected. | ||
* This is not for direct use outside this file. | ||
*/ | ||
private[kafka010] case class CachedKafkaConsumer private( | ||
private[kafka010] case class InternalKafkaConsumer( | ||
topicPartition: TopicPartition, | ||
kafkaParams: ju.Map[String, Object]) extends Logging { | ||
import CachedKafkaConsumer._ | ||
import InternalKafkaConsumer._ | ||
|
||
private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] | ||
|
||
private var consumer = createConsumer | ||
@volatile private var consumer = createConsumer | ||
|
||
/** indicates whether this consumer is in use or not */ | ||
private var inuse = true | ||
@volatile var inuse = true | ||
|
||
/** indicate whether this consumer is going to be stopped in the next release */ | ||
@volatile var markedForClose = false | ||
|
||
/** Iterator to the already fetch data */ | ||
private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] | ||
private var nextOffsetInFetchedData = UNKNOWN_OFFSET | ||
@volatile private var fetchedData = | ||
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] | ||
@volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET | ||
|
||
/** Create a KafkaConsumer to fetch records for `topicPartition` */ | ||
private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { | ||
|
@@ -61,8 +104,6 @@ private[kafka010] case class CachedKafkaConsumer private( | |
c | ||
} | ||
|
||
case class AvailableOffsetRange(earliest: Long, latest: Long) | ||
|
||
private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match { | ||
case ut: UninterruptibleThread => | ||
ut.runUninterruptibly(body) | ||
|
@@ -313,21 +354,53 @@ private[kafka010] case class CachedKafkaConsumer private( | |
} | ||
} | ||
|
||
private[kafka010] object CachedKafkaConsumer extends Logging { | ||
|
||
private val UNKNOWN_OFFSET = -2L | ||
private[kafka010] object KafkaDataConsumer extends Logging { | ||
|
||
private case class CacheKey(groupId: String, topicPartition: TopicPartition) | ||
case class AvailableOffsetRange(earliest: Long, latest: Long) | ||
|
||
private case class CachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer) | ||
extends KafkaDataConsumer { | ||
override def release(): Unit = { releaseConsumer(internalConsumer) } | ||
} | ||
|
||
private case class NonCachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer) | ||
extends KafkaDataConsumer { | ||
override def release(): Unit = { internalConsumer.close() } | ||
} | ||
|
||
private case class CacheKey(groupId: String, topicPartition: TopicPartition) { | ||
def this(topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object]) = | ||
this(kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String], topicPartition) | ||
} | ||
|
||
// This cache has the following important properties. | ||
// - We make a best-effort attempt to maintain the max size of the cache as configured capacity. | ||
// The capacity is not guaranteed to be maintained, especially when there are more active | ||
// tasks simultaneously using consumers than the capacity. | ||
// - A cached consumer will not be cleared while it is in use. This is an invariant that | ||
// we maintain. | ||
private lazy val cache = { | ||
val conf = SparkEnv.get.conf | ||
val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64) | ||
new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer](capacity, 0.75f, true) { | ||
new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, true) { | ||
override def removeEldestEntry( | ||
entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer]): Boolean = { | ||
entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer]): Boolean = { | ||
|
||
// Try to remove the least-used entry if its currently not in use. This maintains the | ||
// invariant mentioned in the docs above. | ||
// | ||
// If you cannot remove it, then the cache will keep growing. In the worst case, | ||
// the cache will grow to the max number of concurrent tasks that can run in the executor, | ||
// (that is, number of tasks slots) after which it will never reduce. This is unlikely to | ||
// be a serious problem because an executor with more than 64 (default) tasks slots is | ||
// likely running on a beefy machine that can handle a large number of simultaneously | ||
// active consumers. | ||
|
||
if (entry.getValue.inuse == false && this.size > capacity) { | ||
logWarning(s"KafkaConsumer cache hitting max capacity of $capacity, " + | ||
s"removing consumer for ${entry.getKey}") | ||
logWarning( | ||
s"KafkaConsumer cache hitting max capacity of $capacity, " + | ||
s"removing consumer for ${entry.getKey}") | ||
try { | ||
entry.getValue.close() | ||
} catch { | ||
|
@@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends Logging { | |
} | ||
} | ||
|
||
def releaseKafkaConsumer( | ||
topic: String, | ||
partition: Int, | ||
kafkaParams: ju.Map[String, Object]): Unit = { | ||
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] | ||
val topicPartition = new TopicPartition(topic, partition) | ||
val key = CacheKey(groupId, topicPartition) | ||
|
||
private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = { | ||
synchronized { | ||
val consumer = cache.get(key) | ||
if (consumer != null) { | ||
consumer.inuse = false | ||
} else { | ||
logWarning(s"Attempting to release consumer that does not exist") | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Removes (and closes) the Kafka Consumer for the given topic, partition and group id. | ||
*/ | ||
def removeKafkaConsumer( | ||
topic: String, | ||
partition: Int, | ||
kafkaParams: ju.Map[String, Object]): Unit = { | ||
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] | ||
val topicPartition = new TopicPartition(topic, partition) | ||
val key = CacheKey(groupId, topicPartition) | ||
|
||
synchronized { | ||
val removedConsumer = cache.remove(key) | ||
if (removedConsumer != null) { | ||
removedConsumer.close() | ||
// If it has been marked for close, then do it any way | ||
if (intConsumer.inuse && intConsumer.markedForClose) intConsumer.close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible we have the following condition - should intConsumer.close() be called ? !intConsumer.inuse && intConsumer.markedForClose There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I rewrote the logic. Hopefully, it's simpler to reason about it now. |
||
intConsumer.inuse = false | ||
|
||
// Clear the consumer from the cache if this is indeed the consumer present in the cache | ||
val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams) | ||
val cachedIntConsumer = cache.get(key) | ||
if (cachedIntConsumer != null) { | ||
if (cachedIntConsumer.eq(intConsumer)) { | ||
// The released consumer is indeed the cached one. | ||
cache.remove(key) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should remove it only when it's closed. |
||
} else { | ||
// The released consumer is not the cached one. Don't do anything. | ||
// This should not happen as long as we maintain the invariant mentioned above. | ||
logWarning( | ||
s"Cached consumer not the same one as the one being release" + | ||
s"\ncached = $cachedIntConsumer [${System.identityHashCode(cachedIntConsumer)}]" + | ||
s"\nreleased = $intConsumer [${System.identityHashCode(intConsumer)}]") | ||
} | ||
} else { | ||
// The released consumer is not in the cache. Don't do anything. | ||
// This should not happen as long as we maintain the invariant mentioned above. | ||
logWarning(s"Attempting to release consumer that is not in the cache") | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Get a cached consumer for groupId, assigned to topic and partition. | ||
* If matching consumer doesn't already exist, will be created using kafkaParams. | ||
* The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]]. | ||
* | ||
* Note: This method guarantees that the consumer returned is not currently in use by any one | ||
* else. Within this guarantee, this will make a best effort attempt to re-use consumers by | ||
* caching them and tracking when they are in use. | ||
*/ | ||
def getOrCreate( | ||
topic: String, | ||
partition: Int, | ||
kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized { | ||
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] | ||
val topicPartition = new TopicPartition(topic, partition) | ||
val key = CacheKey(groupId, topicPartition) | ||
|
||
// If this is reattempt at running the task, then invalidate cache and start with | ||
// a new consumer | ||
def acquire( | ||
topicPartition: TopicPartition, | ||
kafkaParams: ju.Map[String, Object], | ||
useCache: Boolean): KafkaDataConsumer = synchronized { | ||
val key = new CacheKey(topicPartition, kafkaParams) | ||
val existingInternalConsumer = cache.get(key) | ||
|
||
lazy val newInternalConsumer = new InternalKafkaConsumer(topicPartition, kafkaParams) | ||
|
||
if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { | ||
removeKafkaConsumer(topic, partition, kafkaParams) | ||
val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams) | ||
consumer.inuse = true | ||
cache.put(key, consumer) | ||
consumer | ||
} else { | ||
if (!cache.containsKey(key)) { | ||
cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams)) | ||
// If this is reattempt at running the task, then invalidate cached consumer if any and | ||
// start with a new one. | ||
if (existingInternalConsumer != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic here seems wrong. I think it should be something like this?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is indeed better. What I was doing was always deferring to a later point. But that would lead to it being used one more time before being closed. |
||
if (existingInternalConsumer.inuse) { | ||
// Consumer exists in cache and is somehow in use. Don't close it immediately, but | ||
// mark it for being closed when it is released. | ||
existingInternalConsumer.markedForClose = true | ||
NonCachedKafkaDataConsumer(newInternalConsumer) | ||
|
||
} else { | ||
// Consumer exists in cache and is not in use, so close it immediately and replace | ||
// it with a new one. | ||
existingInternalConsumer.close() | ||
cache.put(key, newInternalConsumer) | ||
CachedKafkaDataConsumer(newInternalConsumer) | ||
|
||
} | ||
} else { | ||
// Consumer is not cached, put the new one in the cache | ||
cache.put(key, newInternalConsumer) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. correct. thanks! |
||
CachedKafkaDataConsumer(newInternalConsumer) | ||
|
||
} | ||
val consumer = cache.get(key) | ||
consumer.inuse = true | ||
consumer | ||
} else if (!useCache) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why? I am saying that we should NOT reuse consumers for ANY task retries, independent of the desire to use the cache or not. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically that wont happens because the continuous query and the batch query will have different groupids. But I agree that if |
||
// If planner asks to not reuse consumers, then do not use it, return a new consumer | ||
NonCachedKafkaDataConsumer(newInternalConsumer) | ||
|
||
} else if (existingInternalConsumer == null) { | ||
// If consumer is not already cached, then put a new in the cache and return it | ||
newInternalConsumer.inuse = true | ||
cache.put(key, newInternalConsumer) | ||
CachedKafkaDataConsumer(newInternalConsumer) | ||
|
||
} else if (existingInternalConsumer.inuse) { | ||
// If consumer is already cached but is currently in use, then return a new consumer | ||
NonCachedKafkaDataConsumer(newInternalConsumer) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe keep an internal counter for how many times the non cached consumer is created. |
||
|
||
} else { | ||
// If consumer is already cached and is currently not in use, then return that consumer | ||
CachedKafkaDataConsumer(existingInternalConsumer) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder why this was not caught in the stress test. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i will run a longer stress test locally just to be sure. |
||
|
||
} | ||
} | ||
} | ||
|
||
/** Create an [[CachedKafkaConsumer]] but don't put it into cache. */ | ||
def createUncached( | ||
topic: String, | ||
partition: Int, | ||
kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = { | ||
new CachedKafkaConsumer(new TopicPartition(topic, partition), kafkaParams) | ||
} | ||
private[kafka010] object InternalKafkaConsumer extends Logging { | ||
|
||
private val UNKNOWN_OFFSET = -2L | ||
|
||
private def reportDataLoss0( | ||
failOnDataLoss: Boolean, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these
@volatile
s are not necessary. I'm okey with them though.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, i just added them to be safer. one less thing to worry about.