Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23623] [SS] Avoid concurrent use of cached consumers in CachedKafkaConsumer #20767

Closed
wants to merge 5 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Mar 8, 2018

What changes were proposed in this pull request?

CacheKafkaConsumer in the project kafka-0-10-sql is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly.

Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to ConcurrentModificationException ,or worse, silent reading of incorrect data.

Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag inuse is not checked when returning a cached consumer, see this). If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. Then the planner does not have to have a flag to avoid reuses.

This PR is a step towards that goal. It does the following.

  • There are effectively two kinds of consumer that may be generated
    • Cached consumer - this should be returned to the pool at task end
    • Non-cached consumer - this should be closed at task end
  • A trait called KafkaConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply called val consumer = KafkaConsumer.acquire and then consumer.release().
  • If there is request for a consumer that is in-use, then a new consumer is generated.
  • If there is a concurrent attempt of the same task, then a new consumer is generated, and the existing cached consumer is marked for close upon release.
  • In addition, I renamed the classes because CachedKafkaConsumer is a misnomer given that what it returns may or may not be cached.

This PR does not remove the planner flag to avoid reuse to make this patch safe enough for merging in branch-2.3. This can be done later in master-only.

How was this patch tested?

A new stress test that verifies it is safe to concurrently get consumers for the same partition from the consumer pool.

@tdas tdas changed the title Fixed [SPARK-23623] [SS] Avoid concurrent use of cached consumers in CachedKafkaConsumer Mar 8, 2018
@tdas
Copy link
Contributor Author

tdas commented Mar 8, 2018

@zsxwing @brkyvz PTAL.

@SparkQA
Copy link

SparkQA commented Mar 8, 2018

Test build #88070 has finished for PR 20767 at commit 97510c6.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Mar 8, 2018

jenkins retest this please.

@SparkQA
Copy link

SparkQA commented Mar 8, 2018

Test build #88081 has finished for PR 20767 at commit 9e771b0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 8, 2018

Test build #88082 has finished for PR 20767 at commit 9e771b0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments

} else if (existingInternalConsumer == null) {
newNonCachedConsumer.internalConsumer.inuse = true
cache.put(key, newNonCachedConsumer.internalConsumer)
newNonCachedConsumer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return a CachedKafkaDataConsumer in this case. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yes. damn it. my bad.

cache.remove(key)
} else {
consumer.inuse = false
}
} else {
logWarning(s"Attempting to release consumer that does not exist")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case that a consumer may be evicted because of the max capacity. In this case, we should close the internal consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah. The warning was misleading. Will add comments to clarify that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be the case. We do not remove any consumer from the cache while it is being used. So the scenario that you mentioned should not happen.

cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams))
// If this is reattempt at running the task, then invalidate cache and start with
// a new consumer
if (existingInternalConsumer != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic here seems wrong. I think it should be something like this?

      if (existingInternalConsumer != null) {
        if (existingInternalConsumer.inuse) {
          existingInternalConsumer.markedForClose = true
          newNonCachedConsumer
        } else {
          existingInternalConsumer.close()
          cache.put(key, newNonCachedConsumer.internalConsumer)
          new CachedKafkaDataConsumer(newNonCachedConsumer.internalConsumer)
        }
      } else {
        cache.put(key, newNonCachedConsumer.internalConsumer)
        new CachedKafkaDataConsumer(newNonCachedConsumer.internalConsumer)
      }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed better. What I was doing was always deferring to a later point. But that would lead to it being used one more time before being closed.

val numThreads = 50
val numConsumerUsages = 500

val threadpool = Executors.newFixedThreadPool(numThreads)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: threadpool should be shut down

@SparkQA
Copy link

SparkQA commented Mar 9, 2018

Test build #88109 has finished for PR 20767 at commit 0a838c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this. Left one comment.

}
}
}

/**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using kafkaParams.
* This will make a best effort attempt to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to see the rest of this sentence. Such a cliffhanger!

@SparkQA
Copy link

SparkQA commented Mar 9, 2018

Test build #88140 has finished for PR 20767 at commit 37a9225.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]

private var consumer = createConsumer
@volatile private var consumer = createConsumer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these @volatiles are not necessary. I'm okey with them though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i just added them to be safer. one less thing to worry about.


} else {
// If consumer is already cached and is currently not in use, then return that consumer
CachedKafkaDataConsumer(existingInternalConsumer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should set existingInternalConsumer.isuse = true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this was not caught in the stress test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will run a longer stress test locally just to be sure.

val consumer = cache.get(key)
consumer.inuse = true
consumer
} else if (!useCache) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this if should be moved before the above if.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? I am saying that we should NOT reuse consumers for ANY task retries, independent of the desire to use the cache or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When useCache is false, i would expect newInternalConsumer should never be put into the cache. The above if may put newInternalConsumer into the cache. If we put a consumer used by a continuous processing query into the cache and assume it never ends, it will prevent other micro batch queries from putting a consumer reading the same topic partition into the cache.

Copy link
Contributor Author

@tdas tdas Mar 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically that wont happens because the continuous query and the batch query will have different groupids. But I agree that if useCache is false, then we should not put it in the cache in any way. In fact, we can simplify the task retry case further by never putting the new one in the cache, only invalidate the existing cached one. The only scenario whether this will hurt a little would be the micro-batch immediately after the reattempt will create a new consumer. Thats a tiny one time cost in a scenario whether the reattempt has already made is slightly costly.

if (cachedIntConsumer != null) {
if (cachedIntConsumer.eq(intConsumer)) {
// The released consumer is indeed the cached one.
cache.remove(key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove it only when it's closed.

if (removedConsumer != null) {
removedConsumer.close()
// If it has been marked for close, then do it any way
if (intConsumer.inuse && intConsumer.markedForClose) intConsumer.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible we have the following condition - should intConsumer.close() be called ?

!intConsumer.inuse && intConsumer.markedForClose

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote the logic. Hopefully, it's simpler to reason about it now.

}
} else {
// Consumer is not cached, put the new one in the cache
cache.put(key, newInternalConsumer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should newInternalConsumer.inuse = true be called ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. correct. thanks!

@koeninger
Copy link
Contributor

Can you clarify why you want to allow only 1 cached consumer per topicpartition, closing any others at task end?

It seems like opening and closing consumers would be less efficient than allowing a pool of more than one consumer per topicpartition.

@tdas
Copy link
Contributor Author

tdas commented Mar 15, 2018

@koeninger good question Cody! I think we should fix this limitation eventually. The only reason I am not doing that in this PR is to keep the changes minimum for backporting to 2.3.x. Eventually, we should not do such cache management, and rather use something like Apache Common Pool.

@tdas
Copy link
Contributor Author

tdas commented Mar 16, 2018

@tedyu @zsxwing thank you very much for catching the bugs. I have simplified the logic quite a bit. Note that I removed the invariant that I had introduced earlier. Additionally, I locally ran the stress test with 100 threads and 10000 read attempts, which ran for 2 mins. It passed. Please review the logic one again.

@SparkQA
Copy link

SparkQA commented Mar 16, 2018

Test build #88285 has finished for PR 20767 at commit 5363ea8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

CachedKafkaDataConsumer(newInternalConsumer)

} else if (existingInternalConsumer.inuse) {
} else if (existingInternalConsumer.inUse) {
// If consumer is already cached but is currently in use, then return a new consumer
NonCachedKafkaDataConsumer(newInternalConsumer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe keep an internal counter for how many times the non cached consumer is created.
This would give us information on how effective the cache is

@tdas
Copy link
Contributor Author

tdas commented Mar 16, 2018 via email

@zsxwing
Copy link
Member

zsxwing commented Mar 16, 2018

@tdas this is much simpler!!! LGTM. Merging to master.

@asfgit asfgit closed this in bd201bf Mar 16, 2018
@tedyu
Copy link
Contributor

tedyu commented Mar 16, 2018

@tdas
Do you think a follow on JIRA can be logged for adding metrics for the cache operations ?

Thanks

@zsxwing
Copy link
Member

zsxwing commented Mar 16, 2018

@tedyu that's a good idea. Could you create a ticket? Thanks!

@tdas
Copy link
Contributor Author

tdas commented Mar 16, 2018

@tedyu @zsxwing My thoughts on this is that we should consider migrating to something like Apache Common Pool (assuming it does not require additional maven libraries), which might be less maintenance load. It could be that it already has metrics, etc. that we can leverage.

@tedyu
Copy link
Contributor

tedyu commented Mar 16, 2018

I did a quick search for 'apache commons pool metrics' which didn't show up directly related links.

tdas added a commit to tdas/spark that referenced this pull request Mar 16, 2018
…afkaConsumer

CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly.

Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to ConcurrentModificationException ,or worse, silent reading of incorrect data.

Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag inuse is not checked when returning a cached consumer, see [this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)). If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. Then the planner does not have to have a flag to avoid reuses.

This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply called `val consumer = KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is a concurrent attempt of the same task, then a new consumer is generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a misnomer given that what it returns may or may not be cached.

This PR does not remove the planner flag to avoid reuse to make this patch safe enough for merging in branch-2.3. This can be done later in master-only.

A new stress test that verifies it is safe to concurrently get consumers for the same partition from the consumer pool.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#20767 from tdas/SPARK-23623.
@tdas
Copy link
Contributor Author

tdas commented Mar 16, 2018

@tedyu It was indeed hard to find :) But apache commons pool does expose metrics on idle/active counts. See https://commons.apache.org/proper/commons-pool/apidocs/org/apache/commons/pool2/impl/BaseGenericObjectPool.html

@tdas
Copy link
Contributor Author

tdas commented Mar 16, 2018

@tedyu Just to be clear, I am not saying that we have to move to this pool stuff. I am just saying that if we want to make this more robust (as @koeninger suggested as well) , then we should try to use existing tools (after careful evaluation), rather than trying to reinvent the wheel.

asfgit pushed a commit that referenced this pull request Mar 17, 2018
…afkaConsumer (branch-2.3)

This is a backport of #20767 to branch 2.3

## What changes were proposed in this pull request?
CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly.

Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to ConcurrentModificationException ,or worse, silent reading of incorrect data.

Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag inuse is not checked when returning a cached consumer, see [this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)). If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. Then the planner does not have to have a flag to avoid reuses.

This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply called `val consumer = KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is a concurrent attempt of the same task, then a new consumer is generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a misnomer given that what it returns may or may not be cached.

This PR does not remove the planner flag to avoid reuse to make this patch safe enough for merging in branch-2.3. This can be done later in master-only.

## How was this patch tested?
A new stress test that verifies it is safe to concurrently get consumers for the same partition from the consumer pool.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20848 from tdas/SPARK-23623-2.3.
@gaborgsomogyi
Copy link
Contributor

@tdas @zsxwing @koeninger @tedyu do you think it makes sense to make similar step in the DStream area like this and then later follow with the mentioned Apache Common Pool?

mstewart141 pushed a commit to mstewart141/spark that referenced this pull request Mar 24, 2018
…afkaConsumer

## What changes were proposed in this pull request?

CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly.

Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to ConcurrentModificationException ,or worse, silent reading of incorrect data.

Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag inuse is not checked when returning a cached consumer, see [this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)). If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. Then the planner does not have to have a flag to avoid reuses.

This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply called `val consumer = KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is a concurrent attempt of the same task, then a new consumer is generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a misnomer given that what it returns may or may not be cached.

This PR does not remove the planner flag to avoid reuse to make this patch safe enough for merging in branch-2.3. This can be done later in master-only.

## How was this patch tested?
A new stress test that verifies it is safe to concurrently get consumers for the same partition from the consumer pool.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#20767 from tdas/SPARK-23623.
peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
…afkaConsumer (branch-2.3)

This is a backport of apache#20767 to branch 2.3

## What changes were proposed in this pull request?
CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly.

Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to ConcurrentModificationException ,or worse, silent reading of incorrect data.

Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag inuse is not checked when returning a cached consumer, see [this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)). If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. Then the planner does not have to have a flag to avoid reuses.

This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply called `val consumer = KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is a concurrent attempt of the same task, then a new consumer is generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a misnomer given that what it returns may or may not be cached.

This PR does not remove the planner flag to avoid reuse to make this patch safe enough for merging in branch-2.3. This can be done later in master-only.

## How was this patch tested?
A new stress test that verifies it is safe to concurrently get consumers for the same partition from the consumer pool.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#20848 from tdas/SPARK-23623-2.3.
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…afkaConsumer

CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly.

Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to ConcurrentModificationException ,or worse, silent reading of incorrect data.

Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag inuse is not checked when returning a cached consumer, see [this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)). If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. Then the planner does not have to have a flag to avoid reuses.

This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply called `val consumer = KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is a concurrent attempt of the same task, then a new consumer is generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a misnomer given that what it returns may or may not be cached.

This PR does not remove the planner flag to avoid reuse to make this patch safe enough for merging in branch-2.3. This can be done later in master-only.

A new stress test that verifies it is safe to concurrently get consumers for the same partition from the consumer pool.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#20767 from tdas/SPARK-23623.

Ref: LIHADOOP-48531

RB=1845034
A=
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants