-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21501] Change CacheLoader to limit entries based on memory footprint #18940
Conversation
ok to test |
LGTM, btw, no unit tests for the change? |
Test build #80636 has finished for PR 18940 at commit
|
Can you fix the title? Doesn't look like this is related to SPARK-734. |
.maximumSize(indexCacheEntries).build(indexCacheLoader); | ||
shuffleIndexCache = | ||
CacheBuilder.newBuilder() | ||
.maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: these lines are indented way too far (the previous code was, too).
See block above this one for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah the prev code actually made me to follow the convention, ok will revert to 2 space indentation thanks
@dbolshak there were no unit tests for google cache implementation here before, I could add a simple test to check for cache behavior if it is necessary but ideally a scale test is necessary to understand the shuffleCacheIndex behavior. |
I like this feature. |
nit: title should be " |
@kiszk I dont think that would be ideal, it is better to backport the feature itself to a desired version or branch, having two conflicting configs for the same task is not ideal, if that is what you mean, thanks. |
@redsanket I am thinking about the case that the same configuration file, which explicitly sets a value (e.g. 4096) into What do you think? |
@kiszk wouldn't the updated release notes/docs take care of that, which configs can no longer be used and which are not. I don't mind adding a warning msg saying please use another cache.size instead of cache.entries or providing two alternate implementations based on entries/size. I would like to see what other PMC's think about this @tgravescs @vanzin |
If you're removing a public config, you should at least add it to |
Test build #80691 has finished for PR 18940 at commit
|
Test build #80696 has finished for PR 18940 at commit
|
@@ -104,15 +105,21 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF | |||
Executor directoryCleaner) throws IOException { | |||
this.conf = conf; | |||
this.registeredExecutorFile = registeredExecutorFile; | |||
int indexCacheEntries = conf.getInt("spark.shuffle.service.index.cache.entries", 1024); | |||
String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create a new config in internal/config
, then we can use it by package$.MODULE$.SHUFFLE_INDEX_CACHE_SIZE()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
internal/config is in spark core not in the common/network code. we shouldn't be using the core version here as it will add an extra dependency that isn't needed and then will be required to ship in like the external shuffle jar, etc.
I prefer to leave it as is and if we want a config builder in the common network code we do that as a separate jira.
+1. Any further comments. @vanzin @jiangxb1987 |
@@ -597,7 +597,8 @@ private[spark] object SparkConf extends Logging { | |||
DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0", | |||
"Please use the new blacklisting options, spark.blacklist.*"), | |||
DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"), | |||
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more") | |||
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more"), | |||
DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0", "Not used any more") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to mention the new config that's replacing it in the warning message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 We better let user know what the alternative config is.
LGTM, also cc @cloud-fan |
@vanzin addressed the config comment thanks |
Test build #80931 has finished for PR 18940 at commit
|
+1, I'm going to merge as it appears all comments addressed. |
Right now the spark shuffle service has a cache for index files. It is based on a # of files cached (spark.shuffle.service.index.cache.entries). This can cause issues if people have a lot of reducers because the size of each entry can fluctuate based on the # of reducers.
We saw an issues with a job that had 170000 reducers and it caused NM with spark shuffle service to use 700-800MB or memory in NM by itself.
We should change this cache to be memory based and only allow a certain memory size used. When I say memory based I mean the cache should have a limit of say 100MB.
https://issues.apache.org/jira/browse/SPARK-21501
Manual Testing with 170000 reducers has been performed with cache loaded up to max 100MB default limit, with each shuffle index file of size 1.3MB. Eviction takes place as soon as the total cache size reaches the 100MB limit and the objects will be ready for garbage collection there by avoiding NM to crash. No notable difference in runtime has been observed.