Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23623] [SS] Avoid concurrent use of cached consumers in CachedKafkaConsumer #20767

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private[kafka010] case class InternalKafkaConsumer(
@volatile private var consumer = createConsumer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these @volatiles are not necessary. I'm okey with them though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i just added them to be safer. one less thing to worry about.


/** indicates whether this consumer is in use or not */
@volatile var inuse = true
@volatile var inUse = true

/** indicate whether this consumer is going to be stopped in the next release */
@volatile var markedForClose = false
Expand Down Expand Up @@ -361,7 +361,8 @@ private[kafka010] object KafkaDataConsumer extends Logging {

private case class CachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer)
extends KafkaDataConsumer {
override def release(): Unit = { releaseConsumer(internalConsumer) }
assert(internalConsumer.inUse) // make sure this has been set to true
override def release(): Unit = { KafkaDataConsumer.release(internalConsumer) }
}

private case class NonCachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer)
Expand All @@ -378,17 +379,14 @@ private[kafka010] object KafkaDataConsumer extends Logging {
// - We make a best-effort attempt to maintain the max size of the cache as configured capacity.
// The capacity is not guaranteed to be maintained, especially when there are more active
// tasks simultaneously using consumers than the capacity.
// - A cached consumer will not be cleared while it is in use. This is an invariant that
// we maintain.
private lazy val cache = {
val conf = SparkEnv.get.conf
val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, true) {
override def removeEldestEntry(
entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer]): Boolean = {

// Try to remove the least-used entry if its currently not in use. This maintains the
// invariant mentioned in the docs above.
// Try to remove the least-used entry if its currently not in use.
//
// If you cannot remove it, then the cache will keep growing. In the worst case,
// the cache will grow to the max number of concurrent tasks that can run in the executor,
Expand All @@ -397,7 +395,7 @@ private[kafka010] object KafkaDataConsumer extends Logging {
// likely running on a beefy machine that can handle a large number of simultaneously
// active consumers.

if (entry.getValue.inuse == false && this.size > capacity) {
if (entry.getValue.inUse == false && this.size > capacity) {
logWarning(
s"KafkaConsumer cache hitting max capacity of $capacity, " +
s"removing consumer for ${entry.getKey}")
Expand All @@ -415,43 +413,13 @@ private[kafka010] object KafkaDataConsumer extends Logging {
}
}

private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
synchronized {

// If it has been marked for close, then do it any way
if (intConsumer.inuse && intConsumer.markedForClose) intConsumer.close()
intConsumer.inuse = false

// Clear the consumer from the cache if this is indeed the consumer present in the cache
val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams)
val cachedIntConsumer = cache.get(key)
if (cachedIntConsumer != null) {
if (cachedIntConsumer.eq(intConsumer)) {
// The released consumer is indeed the cached one.
cache.remove(key)
} else {
// The released consumer is not the cached one. Don't do anything.
// This should not happen as long as we maintain the invariant mentioned above.
logWarning(
s"Cached consumer not the same one as the one being release" +
s"\ncached = $cachedIntConsumer [${System.identityHashCode(cachedIntConsumer)}]" +
s"\nreleased = $intConsumer [${System.identityHashCode(intConsumer)}]")
}
} else {
// The released consumer is not in the cache. Don't do anything.
// This should not happen as long as we maintain the invariant mentioned above.
logWarning(s"Attempting to release consumer that is not in the cache")
}
}
}

/**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using kafkaParams.
* The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]].
*
* Note: This method guarantees that the consumer returned is not currently in use by any one
* else. Within this guarantee, this will make a best effort attempt to re-use consumers by
* else. Within this guarantee, this method will make a best effort attempt to re-use consumers by
* caching them and tracking when they are in use.
*/
def acquire(
Expand All @@ -467,44 +435,58 @@ private[kafka010] object KafkaDataConsumer extends Logging {
// If this is reattempt at running the task, then invalidate cached consumer if any and
// start with a new one.
if (existingInternalConsumer != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic here seems wrong. I think it should be something like this?

      if (existingInternalConsumer != null) {
        if (existingInternalConsumer.inuse) {
          existingInternalConsumer.markedForClose = true
          newNonCachedConsumer
        } else {
          existingInternalConsumer.close()
          cache.put(key, newNonCachedConsumer.internalConsumer)
          new CachedKafkaDataConsumer(newNonCachedConsumer.internalConsumer)
        }
      } else {
        cache.put(key, newNonCachedConsumer.internalConsumer)
        new CachedKafkaDataConsumer(newNonCachedConsumer.internalConsumer)
      }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed better. What I was doing was always deferring to a later point. But that would lead to it being used one more time before being closed.

if (existingInternalConsumer.inuse) {
// Consumer exists in cache and is somehow in use. Don't close it immediately, but
// mark it for being closed when it is released.
// Consumer exists in cache. If its in use, mark it for closing later, or close it now.
if (existingInternalConsumer.inUse) {
existingInternalConsumer.markedForClose = true
NonCachedKafkaDataConsumer(newInternalConsumer)

} else {
// Consumer exists in cache and is not in use, so close it immediately and replace
// it with a new one.
existingInternalConsumer.close()
cache.put(key, newInternalConsumer)
CachedKafkaDataConsumer(newInternalConsumer)

}
} else {
// Consumer is not cached, put the new one in the cache
cache.put(key, newInternalConsumer)
CachedKafkaDataConsumer(newInternalConsumer)

}
cache.remove(key) // Invalidate the cache in any case
NonCachedKafkaDataConsumer(newInternalConsumer)

} else if (!useCache) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this if should be moved before the above if.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? I am saying that we should NOT reuse consumers for ANY task retries, independent of the desire to use the cache or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When useCache is false, i would expect newInternalConsumer should never be put into the cache. The above if may put newInternalConsumer into the cache. If we put a consumer used by a continuous processing query into the cache and assume it never ends, it will prevent other micro batch queries from putting a consumer reading the same topic partition into the cache.

Copy link
Contributor Author

@tdas tdas Mar 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically that wont happens because the continuous query and the batch query will have different groupids. But I agree that if useCache is false, then we should not put it in the cache in any way. In fact, we can simplify the task retry case further by never putting the new one in the cache, only invalidate the existing cached one. The only scenario whether this will hurt a little would be the micro-batch immediately after the reattempt will create a new consumer. Thats a tiny one time cost in a scenario whether the reattempt has already made is slightly costly.

// If planner asks to not reuse consumers, then do not use it, return a new consumer
NonCachedKafkaDataConsumer(newInternalConsumer)

} else if (existingInternalConsumer == null) {
// If consumer is not already cached, then put a new in the cache and return it
newInternalConsumer.inuse = true
cache.put(key, newInternalConsumer)
newInternalConsumer.inUse = true
CachedKafkaDataConsumer(newInternalConsumer)

} else if (existingInternalConsumer.inuse) {
} else if (existingInternalConsumer.inUse) {
// If consumer is already cached but is currently in use, then return a new consumer
NonCachedKafkaDataConsumer(newInternalConsumer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe keep an internal counter for how many times the non cached consumer is created.
This would give us information on how effective the cache is


} else {
// If consumer is already cached and is currently not in use, then return that consumer
existingInternalConsumer.inUse = true
CachedKafkaDataConsumer(existingInternalConsumer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should set existingInternalConsumer.isuse = true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this was not caught in the stress test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will run a longer stress test locally just to be sure.

}
}

private def release(intConsumer: InternalKafkaConsumer): Unit = {
synchronized {

// Clear the consumer from the cache if this is indeed the consumer present in the cache
val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams)
val cachedIntConsumer = cache.get(key)
if (intConsumer.eq(cachedIntConsumer)) {
// The released consumer is the same object as the cached one.
if (intConsumer.markedForClose) {
intConsumer.close()
cache.remove(key)
} else {
intConsumer.inUse = false
}
} else {
// The released consumer is either not the same one as in the cache, or not in the cache
// at all. This may happen if the cache was invalidate while this consumer was being used.
// Just close this consumer.
intConsumer.close()
logInfo(s"Released a supposedly cached consumer that was not found in the cache")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class KafkaDataConsumerSuite extends SharedSQLContext with PrivateMethodTester {
ENABLE_AUTO_COMMIT_CONFIG -> "false"
)

val numThreads = 50
val numThreads = 100
val numConsumerUsages = 500

@volatile var error: Throwable = null
Expand Down