Skip to content

Commit

Permalink
[SPARK-21501] Change CacheLoader to limit entries based on memory foo…
Browse files Browse the repository at this point in the history
…tprint

Right now the spark shuffle service has a cache for index files. It is based on a # of files cached (spark.shuffle.service.index.cache.entries). This can cause issues if people have a lot of reducers because the size of each entry can fluctuate based on the # of reducers.
We saw an issues with a job that had 170000 reducers and it caused NM with spark shuffle service to use 700-800MB or memory in NM by itself.
We should change this cache to be memory based and only allow a certain memory size used. When I say memory based I mean the cache should have a limit of say 100MB.

https://issues.apache.org/jira/browse/SPARK-21501

Manual Testing with 170000 reducers has been performed with cache loaded up to max 100MB default limit, with each shuffle index file of size 1.3MB. Eviction takes place as soon as the total cache size reaches the 100MB limit and the objects will be ready for garbage collection there by avoiding NM to crash. No notable difference in runtime has been observed.

Author: Sanket Chintapalli <schintap@yahoo-inc.com>

Closes #18940 from redsanket/SPARK-21501.
  • Loading branch information
Sanket Chintapalli authored and Tom Graves committed Aug 23, 2017
1 parent d6b30ed commit 1662e93
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public int getInt(String name, int defaultValue) {
return conf.getInt(name, defaultValue);
}

public String get(String name, String defaultValue) {
return conf.get(name, defaultValue);
}

private String getConfKey(String suffix) {
return "spark." + module + "." + suffix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import com.google.common.collect.Maps;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
Expand Down Expand Up @@ -104,15 +105,21 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
Executor directoryCleaner) throws IOException {
this.conf = conf;
this.registeredExecutorFile = registeredExecutorFile;
int indexCacheEntries = conf.getInt("spark.shuffle.service.index.cache.entries", 1024);
String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<File, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(File file) throws IOException {
return new ShuffleIndexInformation(file);
}
};
shuffleIndexCache = CacheBuilder.newBuilder()
.maximumSize(indexCacheEntries).build(indexCacheLoader);
.maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
.weigher(new Weigher<File, ShuffleIndexInformation>() {
public int weigh(File file, ShuffleIndexInformation indexInfo) {
return indexInfo.getSize();
}
})
.build(indexCacheLoader);
db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
if (db != null) {
executors = reloadRegisteredExecutors(db);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
public class ShuffleIndexInformation {
/** offsets as long buffer */
private final LongBuffer offsets;
private int size;

public ShuffleIndexInformation(File indexFile) throws IOException {
int size = (int)indexFile.length();
size = (int)indexFile.length();
ByteBuffer buffer = ByteBuffer.allocate(size);
offsets = buffer.asLongBuffer();
DataInputStream dis = null;
Expand All @@ -47,6 +48,14 @@ public ShuffleIndexInformation(File indexFile) throws IOException {
}
}

/**
* Size of the index file
* @return size
*/
public int getSize() {
return size;
}

/**
* Get index offset for a particular reducer.
*/
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,9 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
"Please use the new blacklisting options, spark.blacklist.*"),
DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"),
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more")
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more"),
DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0",
"Not used any more. Please use spark.shuffle.service.index.cache.size")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down
6 changes: 3 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -627,10 +627,10 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
<td><code>spark.shuffle.service.index.cache.entries</code></td>
<td>1024</td>
<td><code>spark.shuffle.service.index.cache.size</code></td>
<td>100m</td>
<td>
Max number of entries to keep in the index cache of the shuffle service.
Cache entries limited to the specified memory footprint.
</td>
</tr>
<tr>
Expand Down

0 comments on commit 1662e93

Please sign in to comment.