Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer #22138

Closed
wants to merge 13 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Aug 17, 2018

What changes were proposed in this pull request?

This patch does pooling for both kafka consumers as well as fetched data. The overall benefits of the patch are following:

  • Both pools support eviction on idle objects, which will help closing invalid idle objects which topic or partition are no longer be assigned to any tasks.
  • It also enables applying different policies on pool, which helps optimization of pooling for each pool.
  • We concerned about multiple tasks pointing same topic partition as well as same group id, and existing code can't handle this hence excess seek and fetch could happen. This patch properly handles the case.
  • It also makes the code always safe to leverage cache, hence no need to maintain reuseCache parameter.

Moreover, pooling kafka consumers is implemented based on Apache Commons Pool, which also gives couple of benefits:

  • We can get rid of synchronization of KafkaDataConsumer object while acquiring and returning InternalKafkaConsumer.
  • We can extract the feature of object pool to outside of the class, so that the behaviors of the pool can be tested easily.
  • We can get various statistics for the object pool, and also be able to enable JMX for the pool.

FetchedData instances are pooled by custom implementation of pool instead of leveraging Apache Commons Pool, because they have CacheKey as first key and "desired offset" as second key which "desired offset" is changing - I haven't found any general pool implementations supporting this.

This patch brings additional dependency, Apache Commons Pool 2.6.0 into spark-sql-kafka-0-10 module.

How was this patch tested?

Existing unit tests as well as new tests for object pool.

Also did some experiment regarding proving concurrent access of consumers for same topic partition.

Topic and data distribution is follow:

truck_speed_events_stream_spark_25151_v1:0:99440
truck_speed_events_stream_spark_25151_v1:1:99489
truck_speed_events_stream_spark_25151_v1:2:397759
truck_speed_events_stream_spark_25151_v1:3:198917
truck_speed_events_stream_spark_25151_v1:4:99484
truck_speed_events_stream_spark_25151_v1:5:497320
truck_speed_events_stream_spark_25151_v1:6:99430
truck_speed_events_stream_spark_25151_v1:7:397887
truck_speed_events_stream_spark_25151_v1:8:397813
truck_speed_events_stream_spark_25151_v1:9:0

The experiment only used smallest 4 partitions (0, 1, 4, 6) from these partitions to finish the query earlier.

The result of experiment is below:

branch create Kafka consumer fetch request
master 1986 2837
patch 8 1706

@SparkQA
Copy link

SparkQA commented Aug 17, 2018

Test build #94912 has finished for PR 22138 at commit c82f306.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class PooledObjectInvalidated(key: CacheKey, lastInvalidatedTimestamp: Long,
  • class PoolConfig extends GenericKeyedObjectPoolConfig[InternalKafkaConsumer]
  • case class CacheKey(groupId: String, topicPartition: TopicPartition)

CachedKafkaDataConsumer(pool.borrowObject(key, kafkaParams))
} catch { case _: NoSuchElementException =>
// There's neither idle object to clean up nor available space in pool:
// fail back to create non-cached consumer
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach introduces behavior change: even though cache had capacity, the cache worked like soft capacity and allowed adding item to the cache when there's neither idle object nor free space.

New behavior of the KafkaDataConsumer is creating all the objects to non-cached whenever pool is exhausted and there's no idle object to free up.

I think it is not a big deal when we configure "spark.sql.kafkaConsumerCache.capacity" properly, and having hard capacity feels more convenient to determine what's going on.

However we can still mimic the current behavior with having infinite capacity, so we can be back to current behavior if we feel it makes more sense.

@HeartSaVioR
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Aug 18, 2018

Test build #94914 has finished for PR 22138 at commit fd728ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 18, 2018

Test build #94913 has finished for PR 22138 at commit 94231fe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

If you have multiple consumers for a given key, and those consumers are at different offsets, isn't it likely that the client code will not get the right consumer, leading to extra seeking?

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Aug 19, 2018

@koeninger
I'm not sure I got your point correctly. This patch is based on some assumptions, so please correct me if I'm missing here. Assumptions follow:

  1. There's actually no multiple consumers for a given key working at the same time. The cache key contains topic partition as well as group id, which denotes a specific partition of source. Even the query tries to do self-join so reading same topic in two different sources, I think group id should be different.

  2. In normal case the offset will be continuous, and that's why cache should help. In retrying case this patch invalidates cache as same as current behavior, so it should start from scratch.

(Btw, I'm curious what's more expensive between leveraging pooled object but resetting kafka consumer vs invalidating pooled objects and start from scratch. Latter feels more safer but if we just need extra seek instead of reconnecting to kafka, resetting could be improved and former will be cheaper. I feel it is out of scope of my PR though.)

This patch keeps most of current behaviors, except two spots I guess. I already commented a spot why I change the behavior, and I'll comment another spot for the same.

} else if (existingInternalConsumer.inUse) {
// If consumer is already cached but is currently in use, then return a new consumer
NonCachedKafkaDataConsumer(newInternalConsumer)
// borrow a consumer from pool even in this case
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another behavior change: If this attempt succeeds, we can use pooled consumer from next batch, so no reason to discard the consumer.

But I also see the cost of unnecessary pooling if failure occurs continuously.

So that looks like kind of decision between possibility of success vs possibility of failure (again), and while I decide to cache it, it is pretty easy to go back to current behavior, so please let me know if we think current behavior makes more sense.

@koeninger
Copy link
Contributor

koeninger commented Aug 20, 2018 via email

@HeartSaVioR
Copy link
Contributor Author

If my understanding is right, looks like current approach has same limitation. I guess you're busy, but could you refer some issue number or point out some code lines which was based on the reason if you remember any? It should help to determine whether this patch breaks more spots or not.

@koeninger
Copy link
Contributor

koeninger commented Aug 20, 2018 via email

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Aug 20, 2018

@koeninger

I'm not sure but are you saying that an executor cares about multiple queries (multiple jobs) concurrently? I honestly didn't notice it. If that is going to be problem, we should add something (could we get query id at that time?) in cache key to differentiate consumers. If we want to avoid extra seeking due to different offsets, consumers should not be reused among with multiple queries, and that's just a matter of cache key.

If you are thinking about co-use of consumers among multiple queries because of reusing connection to Kafka, I think extra seeking is unavoidable (I guess fetched data should be much more critical unless we never reuse after returning to pool). If seeking is light operation, we may even go with only reusing connection (not position we already sought): always resetting position (and data maybe?) when borrowing from pool or returning consumer to pool.

Btw, the rationalization of this patch is not solving the issue you're referring. This patch is also based on #20767 but dealing with another improvements pointed out in comments: adopt pool library to not reinvent the wheel, and also enabling metrics regarding the pool.

I'm not sure the issue you're referring is a serious one (show-stopper): if the issue is a kind of serious, someone should handle the issue once we are aware of the issue at March, or at least relevant JIRA issue should be filed with detailed explanation before. I'd like to ask you in favor of handling (or filing) the issue since you may know the issue best.

@koeninger
Copy link
Contributor

koeninger commented Aug 21, 2018 via email

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Aug 21, 2018

@koeninger
Yeah I see what you're saying, then IMHO isolating consumers with query sounds better than others. Adding next offset to the cache key would make consumer moving bucket in cache every time it is processed, which is not expected behavior for general pool solution and we have to reinvent the wheel (and it is not an ideal situation for caching, too).

There's an evict thread in Apache Commons Pool running on background, and we could close consumers being idle for a long time (say 5 mins or higher). That's another benefit of adopting Apache Commons Pool (maybe available for most of general pool solutions): we could also evict cached consumers eventually which topic or partition is removed while query is running. It is not only evicted because of exceeding cache, but also time of inactivity.

@HeartSaVioR
Copy link
Contributor Author

I just addressed eviction to consumer pool as well as added relevant test. This will help closing invalid idle consumers which topic or partition are no longer be assigned to any tasks. I guess current cache is not capable of closing invalid consumers.

I haven't find how to add "query id" to the cache key, but IMHO the patch itself is already providing some values to be merged. It would be even better if someone could guide how to add "query id" to the cache key.

@koeninger @tdas @zsxwing Please take a look and comment. Thanks in advance!

@SparkQA
Copy link

SparkQA commented Aug 24, 2018

Test build #95206 has finished for PR 22138 at commit 859a9d0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Rebased to resolve conflict.

@SparkQA
Copy link

SparkQA commented Aug 29, 2018

Test build #95394 has finished for PR 22138 at commit ba576e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise
* the object will be kept in pool as active object.
*/
def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why kafkaParams is passed as the second argument?
As I see CacheKey itself is constructed from kafkaParams so would not be better to store kafkaParam in a val within CacheKey?

Then objectFactory.keyToKafkaParams would be deleted along with updateKafkaParamForKey. Is not it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to reduce unnecessary computation along with comparing map while accessing with pool. You can see CacheKey keeps as it is, and I guess CacheKey was designed for same reason.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. What about changing CacheKey to not a case class but to a class where kafkaParams is a member but its equals and hashCode methods does not use kafkaParams?
As this values are goes together I have the feeling encapsulating them is better then keeping their relation in a separate map (keyToKafkaParams). It is just an idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good, but let's wait for voices on committers since CacheKey is designed before, not introduced in this patch.

// invalidate all idle consumers for the key
pool.clear(key)

pool.getNumActive()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this call (getNumActive) really necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. Will remove.

CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)

// NOTE: Below lines define the behavior of CachedInternalKafkaConsumerPool, so do not modify
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not found the referenced CachedInternalKafkaConsumerPool. I guess you mean InternalKafkaConsumerPool. Isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I changed the class name and missed to replace it with new name. Will fix.

import PoolConfig._

val conf = SparkEnv.get.conf
val capacity = conf.getInt(CONFIG_NAME_CAPACITY, DEFAULT_VALUE_CAPACITY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a common place where we should document these configurations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I think spark.sql.kafkaConsumerCache.capacity wasn't documented somewhere, but it may be the thing to fix. The patch will provide more configurations to let end users can tune, so maybe worth to add them to some docs? Not 100% sure where to.

@HeartSaVioR
Copy link
Contributor Author

Thanks @attilapiros I fixed the missing spots you left comments.

@SparkQA
Copy link

SparkQA commented Aug 30, 2018

Test build #95450 has finished for PR 22138 at commit 7d13ee5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2018

Test build #95472 has finished for PR 22138 at commit 648550a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

I just applied a new approach: "separate of concerns". This approach does pooling for both kafka consumers as well as fetched data.

Both pools support eviction on idle objects, which will help closing invalid idle objects which topic or partition are no longer be assigned to any tasks.

It also enables applying different policies on pool, which helps optimization of pooling for each pool.

We concerned about multiple tasks pointing same topic partition as well as same group id, and existing code can't handle this hence excess seek and fetch could happen. This approach properly handles the case.

It also makes the code always safe to leverage cache, hence no need to maintain reuseCache parameter.

@koeninger @tdas @zsxwing @arunmahadevan
Could you please take a look at the new approach? I think this approach solves multiple issues existing code has.

@HeartSaVioR
Copy link
Contributor Author

Updated the description of PR to reflect the new approach.

@SparkQA
Copy link

SparkQA commented Aug 30, 2018

Test build #95474 has finished for PR 22138 at commit 017c0bb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Aug 30, 2018

From the stack trace of test failure, it doesn't look like relevant to the code change I guess. Jenkins shows REGRESSION but the test is added at Aug 25, 2018 so aged less than 7 days, which is not enough to consider it as regression or not I think. The test itself could be flaky. The test succeeds 5 times in a row in local dev. and I'll try to run more.

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95474/testReport/org.apache.spark.sql.kafka010/KafkaSourceStressForDontFailOnDataLossSuite/stress_test_for_failOnDataLoss_false/

@zsxwing Do you have any idea on this? I'm curious how checking query exception after query.stop() is relevant to failOnDataLoss = false, given that we check query exception all the time in loop.

query.stop()
// `failOnDataLoss` is `false`, we should not fail the query
if (query.exception.nonEmpty) {
throw query.exception.get
}

If you think additional checking is not necessary after query.stop, I'm happy to provide new minor patch for the fix. At least we may want to check the type of exception and let the test fail when the type of exception is relevant to failOnDataLoss = false.

Will retrigger build for now.

EDIT: From what I looked into stack trace again, it doesn't look like due to check query exception. Still wondering why it happened though.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Aug 31, 2018

Test build #95502 has finished for PR 22138 at commit 017c0bb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Two tests failed and not relevant to this patch.

  • org.apache.spark.scheduler.DAGSchedulerSuite.Barrier task failures from the same stage attempt don't trigger multiple stage retries
    • I just saw this once so would we want to file an issue for this?
  • org.apache.spark.sql.kafka010.KafkaRelationSuite.read Kafka transactional messages: read_committed

Will retrigger again.

@SparkQA
Copy link

SparkQA commented Aug 30, 2019

Test build #109952 has finished for PR 22138 at commit 04e9ddf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

At the first glance doesn't look related.

@gaborgsomogyi
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Aug 30, 2019

Test build #109958 has finished for PR 22138 at commit 04e9ddf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

@HeartSaVioR
Copy link
Contributor Author

Ah OK. I didn't get your previous comment. I guess basic characteristics have been kept from current consumer cache, but something is changed slightly as well as new feature is also available (JMX metrics) sounds better to revise the doc. I'll try revising it. Thanks!

@SparkQA
Copy link

SparkQA commented Sep 3, 2019

Test build #110025 has finished for PR 22138 at commit 297f47a.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

========================================================================
Running build tests
========================================================================
exec: curl -s -L https://downloads.lightbend.com/zinc/0.3.15/zinc-0.3.15.tgz
exec: curl -s -L https://downloads.lightbend.com/scala/2.12.8/scala-2.12.8.tgz
exec: curl -s -L https://www.apache.org/dyn/closer.lua?action=download&filename=/maven/maven-3/3.6.1/binaries/apache-maven-3.6.1-bin.tar.gz

gzip: stdin: not in gzip format
tar: Child returned status 1
tar: Error is not recoverable: exiting now
Using `mvn` from path: /home/jenkins/workspace/SparkPullRequestBuilder/build/apache-maven-3.6.1/bin/mvn
build/mvn: line 163: /home/jenkins/workspace/SparkPullRequestBuilder/build/apache-maven-3.6.1/bin/mvn: No such file or directory
Error while getting version string from Maven:

Looks like intermittent failure.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 3, 2019

Test build #110026 has finished for PR 22138 at commit fa12a0a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

The size of the pool is limited by <code>spark.kafka.consumer.cache.capacity</code>,
but it works as "soft-limit" to not block Spark tasks.

Idle eviction thread periodically removes some consumers which are not used. If this threshold is reached when borrowing,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/removes some consumers/removes consumers

it tries to remove the least-used entry that is currently not in use.

If it cannot be removed, then the pool will keep growing. In the worst case, the pool will grow to
the max number of concurrent tasks that can run in the executor (that is, number of tasks slots).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/tasks slots/task slots

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td>spark.kafka.consumer.fetchedData.cache.timeout</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, not sure if I understand it. It's a timeout but at the same time number of fetched data cached. So how should I interpret this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad - just a copy-and-paste error. Will fix all missing things.

If it cannot be removed, then the cache will keep growing. In the worst case, the cache will grow to
the max number of concurrent tasks that can run in the executor (that is, number of tasks slots),
after which it will never reduce.
The following properties are available to configure the consumer pool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why I don't see spark.kafka.consumer.cache.evictorThreadRunInterval here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same missing. Will fix.

@SparkQA
Copy link

SparkQA commented Sep 4, 2019

Test build #110094 has finished for PR 22138 at commit 74a6cbf.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

retest this please

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (pending tests).

<td>64</td>
</tr>
<tr>
<td>spark.kafka.consumer.cache.timeout</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit: The actual timeout + evictorThreadRunInterval combination may end-up in ~6 minutes total timeout. This shouldn't be a blocker but maybe it would be better to use evictorThreadRunInterval=1m. Same applies on the other place.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Evictor thread doesn't check all idle objects (for Apache Commons Pool) so OK to have shorter interval.

@SparkQA
Copy link

SparkQA commented Sep 4, 2019

Test build #110113 has finished for PR 22138 at commit 74a6cbf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 4, 2019

Test build #110115 has finished for PR 22138 at commit 68af3d5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Sep 4, 2019

Merging to master.

@vanzin vanzin closed this in 594c9c5 Sep 4, 2019
@HeartSaVioR
Copy link
Contributor Author

Thanks all for patiently reviewing and finally merging! That was a long journey!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants