From 297f47a1c780898435785f1c2b0cb033408673f6 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 3 Sep 2019 09:43:03 +0900 Subject: [PATCH] Update document to reflect changes on consumer pool and fetched data pool --- .../structured-streaming-kafka-integration.md | 68 ++++++++++++++++--- .../apache/spark/sql/kafka010/package.scala | 14 ++-- 2 files changed, 66 insertions(+), 16 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 339792b4139f3..d18463ea3ed60 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -430,20 +430,70 @@ The following configurations are optional: ### Consumer Caching It's time-consuming to initialize Kafka consumers, especially in streaming scenarios where processing time is a key factor. -Because of this, Spark caches Kafka consumers on executors. The caching key is built up from the following information: +Because of this, Spark pools Kafka consumers on executors, by leveraging Apache Commons Pool. + +The caching key is built up from the following information: + * Topic name * Topic partition * Group ID -The size of the cache is limited by spark.kafka.consumer.cache.capacity (default: 64). -If this threshold is reached, it tries to remove the least-used entry that is currently not in use. -If it cannot be removed, then the cache will keep growing. In the worst case, the cache will grow to -the max number of concurrent tasks that can run in the executor (that is, number of tasks slots), -after which it will never reduce. +The following properties are available to configure the consumer pool: + + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.kafka.consumer.cache.capacityThe maximum number of consumers cached. Please note that it's a soft limit.64
spark.kafka.consumer.cache.timeoutThe minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor.5m (5 minutes)
spark.kafka.consumer.cache.jmx.enableEnable or disable JMX for pools created with this configuration instance. Statistics of the pool are available via JMX instance. + The prefix of JMX name is set to "kafka010-cached-simple-kafka-consumer-pool". + false
+ +The size of the pool is limited by spark.kafka.consumer.cache.capacity, +but it works as "soft-limit" to not block Spark tasks. + +Idle eviction thread periodically removes some consumers which are not used. If this threshold is reached when borrowing, +it tries to remove the least-used entry that is currently not in use. + +If it cannot be removed, then the pool will keep growing. In the worst case, the pool will grow to +the max number of concurrent tasks that can run in the executor (that is, number of tasks slots). + +If a task fails for any reason, the new task is executed with a newly created Kafka consumer for safety reasons. +At the same time, we invalidate all consumers in pool which have same caching key, to remove consumer which was used +in failed execution. Consumers which any other tasks are using will not be closed, but will be invalidated as well +when they are returned into pool. -If a task fails for any reason the new task is executed with a newly created Kafka consumer for safety reasons. -At the same time the cached Kafka consumer which was used in the failed execution will be invalidated. Here it has to -be emphasized it will not be closed if any other task is using it. +Along with consumers, Spark pools the records fetched from Kafka separately, to let Kafka consumers stateless in point +of Spark's view, and maximize the efficiency of pooling. It leverages same cache key with Kafka consumers pool. +Note that it doesn't leverage Apache Commons Pool due to the difference of characteristics. + +The following properties are available to configure the fetched data pool: + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.kafka.consumer.fetchedData.cache.timeoutThe maximum number of fetched data cached. Please note that it's a soft limit.64
spark.kafka.consumer.fetchedData.cache.evictorThreadRunIntervalThe minimum amount of time a fetched data may sit idle in the pool before it is eligible for eviction by the evictor.5m (5 minutes)
## Writing Data to Kafka diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala index c68ec9bccd7fd..c7784b90aba9d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala @@ -53,6 +53,13 @@ package object kafka010 { // scalastyle:ignore .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("5m") + private[kafka010] val CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL = + ConfigBuilder("spark.kafka.consumer.cache.evictorThreadRunInterval") + .doc("The interval of time between runs of the idle evictor thread for consumer pool. " + + "When non-positive, no idle evictor thread will be run.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("3m") + private[kafka010] val FETCHED_DATA_CACHE_TIMEOUT = ConfigBuilder("spark.kafka.consumer.fetchedData.cache.timeout") .doc("The minimum amount of time a fetched data may sit idle in the pool before " + @@ -61,13 +68,6 @@ package object kafka010 { // scalastyle:ignore .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("5m") - private[kafka010] val CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL = - ConfigBuilder("spark.kafka.consumer.cache.evictorThreadRunInterval") - .doc("The interval of time between runs of the idle evictor thread for consumer pool. " + - "When non-positive, no idle evictor thread will be run.") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefaultString("3m") - private[kafka010] val FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL = ConfigBuilder("spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval") .doc("The interval of time between runs of the idle evictor thread for fetched data pool. " +