diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 339792b4139f3..d18463ea3ed60 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -430,20 +430,70 @@ The following configurations are optional:
### Consumer Caching
It's time-consuming to initialize Kafka consumers, especially in streaming scenarios where processing time is a key factor.
-Because of this, Spark caches Kafka consumers on executors. The caching key is built up from the following information:
+Because of this, Spark pools Kafka consumers on executors, by leveraging Apache Commons Pool.
+
+The caching key is built up from the following information:
+
* Topic name
* Topic partition
* Group ID
-The size of the cache is limited by spark.kafka.consumer.cache.capacity
(default: 64).
-If this threshold is reached, it tries to remove the least-used entry that is currently not in use.
-If it cannot be removed, then the cache will keep growing. In the worst case, the cache will grow to
-the max number of concurrent tasks that can run in the executor (that is, number of tasks slots),
-after which it will never reduce.
+The following properties are available to configure the consumer pool:
+
+
+Property Name | Default | Meaning |
+
+ spark.kafka.consumer.cache.capacity |
+ The maximum number of consumers cached. Please note that it's a soft limit. |
+ 64 |
+
+
+ spark.kafka.consumer.cache.timeout |
+ The minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor. |
+ 5m (5 minutes) |
+
+
+ spark.kafka.consumer.cache.jmx.enable |
+ Enable or disable JMX for pools created with this configuration instance. Statistics of the pool are available via JMX instance.
+ The prefix of JMX name is set to "kafka010-cached-simple-kafka-consumer-pool".
+ |
+ false |
+
+
+
+The size of the pool is limited by spark.kafka.consumer.cache.capacity
,
+but it works as "soft-limit" to not block Spark tasks.
+
+Idle eviction thread periodically removes some consumers which are not used. If this threshold is reached when borrowing,
+it tries to remove the least-used entry that is currently not in use.
+
+If it cannot be removed, then the pool will keep growing. In the worst case, the pool will grow to
+the max number of concurrent tasks that can run in the executor (that is, number of tasks slots).
+
+If a task fails for any reason, the new task is executed with a newly created Kafka consumer for safety reasons.
+At the same time, we invalidate all consumers in pool which have same caching key, to remove consumer which was used
+in failed execution. Consumers which any other tasks are using will not be closed, but will be invalidated as well
+when they are returned into pool.
-If a task fails for any reason the new task is executed with a newly created Kafka consumer for safety reasons.
-At the same time the cached Kafka consumer which was used in the failed execution will be invalidated. Here it has to
-be emphasized it will not be closed if any other task is using it.
+Along with consumers, Spark pools the records fetched from Kafka separately, to let Kafka consumers stateless in point
+of Spark's view, and maximize the efficiency of pooling. It leverages same cache key with Kafka consumers pool.
+Note that it doesn't leverage Apache Commons Pool due to the difference of characteristics.
+
+The following properties are available to configure the fetched data pool:
+
+
+Property Name | Default | Meaning |
+
+ spark.kafka.consumer.fetchedData.cache.timeout |
+ The maximum number of fetched data cached. Please note that it's a soft limit. |
+ 64 |
+
+
+ spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval |
+ The minimum amount of time a fetched data may sit idle in the pool before it is eligible for eviction by the evictor. |
+ 5m (5 minutes) |
+
+
## Writing Data to Kafka
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
index c68ec9bccd7fd..c7784b90aba9d 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
@@ -53,6 +53,13 @@ package object kafka010 { // scalastyle:ignore
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5m")
+ private[kafka010] val CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL =
+ ConfigBuilder("spark.kafka.consumer.cache.evictorThreadRunInterval")
+ .doc("The interval of time between runs of the idle evictor thread for consumer pool. " +
+ "When non-positive, no idle evictor thread will be run.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("3m")
+
private[kafka010] val FETCHED_DATA_CACHE_TIMEOUT =
ConfigBuilder("spark.kafka.consumer.fetchedData.cache.timeout")
.doc("The minimum amount of time a fetched data may sit idle in the pool before " +
@@ -61,13 +68,6 @@ package object kafka010 { // scalastyle:ignore
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5m")
- private[kafka010] val CONSUMER_CACHE_EVICTOR_THREAD_RUN_INTERVAL =
- ConfigBuilder("spark.kafka.consumer.cache.evictorThreadRunInterval")
- .doc("The interval of time between runs of the idle evictor thread for consumer pool. " +
- "When non-positive, no idle evictor thread will be run.")
- .timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("3m")
-
private[kafka010] val FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL =
ConfigBuilder("spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval")
.doc("The interval of time between runs of the idle evictor thread for fetched data pool. " +