Skip to content

Commit

Permalink
Simplified logic
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Mar 16, 2018
1 parent 37a9225 commit 5363ea8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private[kafka010] case class InternalKafkaConsumer(
@volatile private var consumer = createConsumer

/** indicates whether this consumer is in use or not */
@volatile var inuse = true
@volatile var inUse = true

/** indicate whether this consumer is going to be stopped in the next release */
@volatile var markedForClose = false
Expand Down Expand Up @@ -361,7 +361,8 @@ private[kafka010] object KafkaDataConsumer extends Logging {

private case class CachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer)
extends KafkaDataConsumer {
override def release(): Unit = { releaseConsumer(internalConsumer) }
assert(internalConsumer.inUse) // make sure this has been set to true
override def release(): Unit = { KafkaDataConsumer.release(internalConsumer) }
}

private case class NonCachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer)
Expand All @@ -378,17 +379,14 @@ private[kafka010] object KafkaDataConsumer extends Logging {
// - We make a best-effort attempt to maintain the max size of the cache as configured capacity.
// The capacity is not guaranteed to be maintained, especially when there are more active
// tasks simultaneously using consumers than the capacity.
// - A cached consumer will not be cleared while it is in use. This is an invariant that
// we maintain.
private lazy val cache = {
val conf = SparkEnv.get.conf
val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, true) {
override def removeEldestEntry(
entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer]): Boolean = {

// Try to remove the least-used entry if its currently not in use. This maintains the
// invariant mentioned in the docs above.
// Try to remove the least-used entry if its currently not in use.
//
// If you cannot remove it, then the cache will keep growing. In the worst case,
// the cache will grow to the max number of concurrent tasks that can run in the executor,
Expand All @@ -397,7 +395,7 @@ private[kafka010] object KafkaDataConsumer extends Logging {
// likely running on a beefy machine that can handle a large number of simultaneously
// active consumers.

if (entry.getValue.inuse == false && this.size > capacity) {
if (entry.getValue.inUse == false && this.size > capacity) {
logWarning(
s"KafkaConsumer cache hitting max capacity of $capacity, " +
s"removing consumer for ${entry.getKey}")
Expand All @@ -415,43 +413,13 @@ private[kafka010] object KafkaDataConsumer extends Logging {
}
}

private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
synchronized {

// If it has been marked for close, then do it any way
if (intConsumer.inuse && intConsumer.markedForClose) intConsumer.close()
intConsumer.inuse = false

// Clear the consumer from the cache if this is indeed the consumer present in the cache
val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams)
val cachedIntConsumer = cache.get(key)
if (cachedIntConsumer != null) {
if (cachedIntConsumer.eq(intConsumer)) {
// The released consumer is indeed the cached one.
cache.remove(key)
} else {
// The released consumer is not the cached one. Don't do anything.
// This should not happen as long as we maintain the invariant mentioned above.
logWarning(
s"Cached consumer not the same one as the one being release" +
s"\ncached = $cachedIntConsumer [${System.identityHashCode(cachedIntConsumer)}]" +
s"\nreleased = $intConsumer [${System.identityHashCode(intConsumer)}]")
}
} else {
// The released consumer is not in the cache. Don't do anything.
// This should not happen as long as we maintain the invariant mentioned above.
logWarning(s"Attempting to release consumer that is not in the cache")
}
}
}

/**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using kafkaParams.
* The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]].
*
* Note: This method guarantees that the consumer returned is not currently in use by any one
* else. Within this guarantee, this will make a best effort attempt to re-use consumers by
* else. Within this guarantee, this method will make a best effort attempt to re-use consumers by
* caching them and tracking when they are in use.
*/
def acquire(
Expand All @@ -467,44 +435,58 @@ private[kafka010] object KafkaDataConsumer extends Logging {
// If this is reattempt at running the task, then invalidate cached consumer if any and
// start with a new one.
if (existingInternalConsumer != null) {
if (existingInternalConsumer.inuse) {
// Consumer exists in cache and is somehow in use. Don't close it immediately, but
// mark it for being closed when it is released.
// Consumer exists in cache. If its in use, mark it for closing later, or close it now.
if (existingInternalConsumer.inUse) {
existingInternalConsumer.markedForClose = true
NonCachedKafkaDataConsumer(newInternalConsumer)

} else {
// Consumer exists in cache and is not in use, so close it immediately and replace
// it with a new one.
existingInternalConsumer.close()
cache.put(key, newInternalConsumer)
CachedKafkaDataConsumer(newInternalConsumer)

}
} else {
// Consumer is not cached, put the new one in the cache
cache.put(key, newInternalConsumer)
CachedKafkaDataConsumer(newInternalConsumer)

}
cache.remove(key) // Invalidate the cache in any case
NonCachedKafkaDataConsumer(newInternalConsumer)

} else if (!useCache) {
// If planner asks to not reuse consumers, then do not use it, return a new consumer
NonCachedKafkaDataConsumer(newInternalConsumer)

} else if (existingInternalConsumer == null) {
// If consumer is not already cached, then put a new in the cache and return it
newInternalConsumer.inuse = true
cache.put(key, newInternalConsumer)
newInternalConsumer.inUse = true
CachedKafkaDataConsumer(newInternalConsumer)

} else if (existingInternalConsumer.inuse) {
} else if (existingInternalConsumer.inUse) {
// If consumer is already cached but is currently in use, then return a new consumer
NonCachedKafkaDataConsumer(newInternalConsumer)

} else {
// If consumer is already cached and is currently not in use, then return that consumer
existingInternalConsumer.inUse = true
CachedKafkaDataConsumer(existingInternalConsumer)
}
}

private def release(intConsumer: InternalKafkaConsumer): Unit = {
synchronized {

// Clear the consumer from the cache if this is indeed the consumer present in the cache
val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams)
val cachedIntConsumer = cache.get(key)
if (intConsumer.eq(cachedIntConsumer)) {
// The released consumer is the same object as the cached one.
if (intConsumer.markedForClose) {
intConsumer.close()
cache.remove(key)
} else {
intConsumer.inUse = false
}
} else {
// The released consumer is either not the same one as in the cache, or not in the cache
// at all. This may happen if the cache was invalidate while this consumer was being used.
// Just close this consumer.
intConsumer.close()
logInfo(s"Released a supposedly cached consumer that was not found in the cache")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class KafkaDataConsumerSuite extends SharedSQLContext with PrivateMethodTester {
ENABLE_AUTO_COMMIT_CONFIG -> "false"
)

val numThreads = 50
val numThreads = 100
val numConsumerUsages = 500

@volatile var error: Throwable = null
Expand Down

0 comments on commit 5363ea8

Please sign in to comment.