Skip to content

Commit

Permalink
Fixed bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Mar 9, 2018
1 parent 9e771b0 commit 0a838c1
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private[kafka010] case class InternalKafkaConsumer(

private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

private var consumer = createConsumer
@volatile private var consumer = createConsumer

/** indicates whether this consumer is in use or not */
@volatile var inuse = true
Expand All @@ -91,8 +91,9 @@ private[kafka010] case class InternalKafkaConsumer(
@volatile var markedForClose = false

/** Iterator to the already fetch data */
private var fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
private var nextOffsetInFetchedData = UNKNOWN_OFFSET
@volatile private var fetchedData =
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
@volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET

/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
Expand Down Expand Up @@ -358,31 +359,44 @@ private[kafka010] object KafkaDataConsumer extends Logging {

case class AvailableOffsetRange(earliest: Long, latest: Long)

private class CachedKafkaDataConsumer(val internalConsumer: InternalKafkaConsumer)
private case class CachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer)
extends KafkaDataConsumer {
override def release(): Unit = {
releaseKafkaConsumer(internalConsumer.topicPartition, internalConsumer.kafkaParams)
}
override def release(): Unit = { releaseConsumer(internalConsumer) }
}

private class NonCachedKafkaDataConsumer(val internalConsumer: InternalKafkaConsumer)
private case class NonCachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer)
extends KafkaDataConsumer {
override def release(): Unit = {
internalConsumer.close()
}
override def release(): Unit = { internalConsumer.close() }
}

private case class CacheKey(groupId: String, topicPartition: TopicPartition) {
def this(topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object]) =
this(kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String], topicPartition)
}

// This cache has the following important properties.
// - We make a best-effort attempt to maintain the max size of the cache as configured capacity.
// The capacity is not guaranteed to be maintained, especially when there are more active
// tasks simultaneously using consumers than the capacity.
// - A cached consumer will not be cleared while it is in use. This is an invariant that
// we maintain.
private lazy val cache = {
val conf = SparkEnv.get.conf
val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, true) {
override def removeEldestEntry(
entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer]): Boolean = {

// Try to remove the least-used entry if its currently not in use. This maintains the
// invariant mentioned in the docs above.
//
// If you cannot remove it, then the cache will keep growing. In the worst case,
// the cache will grow to the max number of concurrent tasks that can run in the executor,
// (that is, number of tasks slots) after which it will never reduce. This is unlikely to
// be a serious problem because an executor with more than 64 (default) tasks slots is
// likely running on a beefy machine that can handle a large number of simultaneously
// active consumers.

if (entry.getValue.inuse == false && this.size > capacity) {
logWarning(
s"KafkaConsumer cache hitting max capacity of $capacity, " +
Expand All @@ -401,30 +415,40 @@ private[kafka010] object KafkaDataConsumer extends Logging {
}
}

private def releaseKafkaConsumer(
topicPartition: TopicPartition,
kafkaParams: ju.Map[String, Object]): Unit = {
val key = new CacheKey(topicPartition, kafkaParams)

private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
synchronized {
val consumer = cache.get(key)
if (consumer != null) {
if (consumer.markedForClose) {
consumer.close()

// If it has been marked for close, then do it any way
if (intConsumer.inuse && intConsumer.markedForClose) intConsumer.close()
intConsumer.inuse = false

// Clear the consumer from the cache if this is indeed the consumer present in the cache
val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams)
val cachedIntConsumer = cache.get(key)
if (cachedIntConsumer != null) {
if (cachedIntConsumer.eq(intConsumer)) {
// The released consumer is indeed the cached one.
cache.remove(key)
} else {
consumer.inuse = false
// The released consumer is not the cached one. Don't do anything.
// This should not happen as long as we maintain the invariant mentioned above.
logWarning(
s"Cached consumer not the same one as the one being release" +
s"\ncached = $cachedIntConsumer [${System.identityHashCode(cachedIntConsumer)}]" +
s"\nreleased = $intConsumer [${System.identityHashCode(intConsumer)}]")
}
} else {
logWarning(s"Attempting to release consumer that does not exist")
// The released consumer is not in the cache. Don't do anything.
// This should not happen as long as we maintain the invariant mentioned above.
logWarning(s"Attempting to release consumer that is not in the cache")
}
}
}


/**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using kafkaParams.
* This will make a best effort attempt to
*/
def acquire(
topicPartition: TopicPartition,
Expand All @@ -433,26 +457,50 @@ private[kafka010] object KafkaDataConsumer extends Logging {
val key = new CacheKey(topicPartition, kafkaParams)
val existingInternalConsumer = cache.get(key)

lazy val newNonCachedConsumer =
new NonCachedKafkaDataConsumer(new InternalKafkaConsumer(topicPartition, kafkaParams))
lazy val newInternalConsumer = new InternalKafkaConsumer(topicPartition, kafkaParams)

if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
// If this is reattempt at running the task, then invalidate cache and start with
// a new consumer
// If this is reattempt at running the task, then invalidate cached consumer if any and
// start with a new one.
if (existingInternalConsumer != null) {
existingInternalConsumer.markedForClose = true
if (existingInternalConsumer.inuse) {
// Consumer exists in cache and is somehow in use. Don't close it immediately, but
// mark it for being closed when it is released.
existingInternalConsumer.markedForClose = true
NonCachedKafkaDataConsumer(newInternalConsumer)

} else {
// Consumer exists in cache and is not in use, so close it immediately and replace
// it with a new one.
existingInternalConsumer.close()
cache.put(key, newInternalConsumer)
CachedKafkaDataConsumer(newInternalConsumer)

}
} else {
// Consumer is not cached, put the new one in the cache
cache.put(key, newInternalConsumer)
CachedKafkaDataConsumer(newInternalConsumer)

}
newNonCachedConsumer
} else if (!useCache) {
newNonCachedConsumer
// If planner asks to not reuse consumers, then do not use it, return a new consumer
NonCachedKafkaDataConsumer(newInternalConsumer)

} else if (existingInternalConsumer == null) {
newNonCachedConsumer.internalConsumer.inuse = true
cache.put(key, newNonCachedConsumer.internalConsumer)
newNonCachedConsumer
// If consumer is not already cached, then put a new in the cache and return it
newInternalConsumer.inuse = true
cache.put(key, newInternalConsumer)
CachedKafkaDataConsumer(newInternalConsumer)

} else if (existingInternalConsumer.inuse) {
newNonCachedConsumer
// If consumer is already cached but is currently in use, then return a new consumer
NonCachedKafkaDataConsumer(newInternalConsumer)

} else {
new CachedKafkaDataConsumer(existingInternalConsumer)
// If consumer is already cached and is currently not in use, then return that consumer
CachedKafkaDataConsumer(existingInternalConsumer)

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.scalatest.PrivateMethodTester

import org.apache.spark.{TaskContext, TaskContextImpl}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.ThreadUtils

Expand Down Expand Up @@ -59,10 +60,11 @@ class KafkaDataConsumerSuite extends SharedSQLContext with PrivateMethodTester {
assert(e.getCause === cause)
}

test("concurrent use of KafkaDataConsumer") {
test("SPARK-23623: concurrent use of KafkaDataConsumer") {
val topic = "topic" + Random.nextInt()
val data = (1 to 1000).map(_.toString)
testUtils.createTopic(topic, 1)
testUtils.sendMessages(topic, (1 to 1000).map(_.toString).toArray)
testUtils.sendMessages(topic, data.toArray)
val topicPartition = new TopicPartition(topic, 0)

import ConsumerConfig._
Expand All @@ -78,18 +80,25 @@ class KafkaDataConsumerSuite extends SharedSQLContext with PrivateMethodTester {
val numThreads = 50
val numConsumerUsages = 500

val threadpool = Executors.newFixedThreadPool(numThreads)

@volatile var error: Throwable = null

def consume(i: Int): Unit = {
val useCache = Random.nextBoolean
val taskContext = if (Random.nextBoolean) {
new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), null, null, null)
} else {
null
}
TaskContext.setTaskContext(taskContext)
val consumer = KafkaDataConsumer.acquire(
topicPartition, kafkaParams.asJava, useCache = true)
topicPartition, kafkaParams.asJava, useCache)
try {
val range = consumer.getAvailableOffsetRange()
for (offset <- range.earliest until range.latest) {
consumer.get(offset, Long.MaxValue, 10000, failOnDataLoss = false)
val rcvd = range.earliest until range.latest map { offset =>
val bytes = consumer.get(offset, Long.MaxValue, 10000, failOnDataLoss = false).value()
new String(bytes)
}
assert(rcvd == data)
} catch {
case e: Throwable =>
error = e
Expand All @@ -99,13 +108,17 @@ class KafkaDataConsumerSuite extends SharedSQLContext with PrivateMethodTester {
}
}

// Sub
val futures = (1 to numConsumerUsages).map { i =>
threadpool.submit(new Runnable {
override def run(): Unit = { consume(i) }
})
val threadpool = Executors.newFixedThreadPool(numThreads)
try {
val futures = (1 to numConsumerUsages).map { i =>
threadpool.submit(new Runnable {
override def run(): Unit = { consume(i) }
})
}
futures.foreach(_.get(1, TimeUnit.MINUTES))
assert(error == null)
} finally {
threadpool.shutdown()
}
futures.foreach(_.get(1, TimeUnit.MINUTES))
assert(error == null)
}
}

0 comments on commit 0a838c1

Please sign in to comment.