diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 88256b810bf04..fa2ff42de07d0 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -67,6 +67,10 @@ public int getInt(String name, int defaultValue) { return conf.getInt(name, defaultValue); } + public String get(String name, String defaultValue) { + return conf.get(name, defaultValue); + } + private String getConfKey(String suffix) { return "spark." + module + "." + suffix; } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index d7ec0e299dead..e6399897be9c2 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -33,6 +33,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.cache.Weigher; import com.google.common.collect.Maps; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; @@ -104,7 +105,7 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF Executor directoryCleaner) throws IOException { this.conf = conf; this.registeredExecutorFile = registeredExecutorFile; - int indexCacheEntries = conf.getInt("spark.shuffle.service.index.cache.entries", 1024); + String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m"); CacheLoader indexCacheLoader = new CacheLoader() { public ShuffleIndexInformation load(File file) throws IOException { @@ -112,7 +113,13 @@ public ShuffleIndexInformation load(File file) throws IOException { } }; shuffleIndexCache = CacheBuilder.newBuilder() - .maximumSize(indexCacheEntries).build(indexCacheLoader); + .maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize)) + .weigher(new Weigher() { + public int weigh(File file, ShuffleIndexInformation indexInfo) { + return indexInfo.getSize(); + } + }) + .build(indexCacheLoader); db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper); if (db != null) { executors = reloadRegisteredExecutors(db); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java index 39ca9ba574853..386738ece51a6 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java @@ -31,9 +31,10 @@ public class ShuffleIndexInformation { /** offsets as long buffer */ private final LongBuffer offsets; + private int size; public ShuffleIndexInformation(File indexFile) throws IOException { - int size = (int)indexFile.length(); + size = (int)indexFile.length(); ByteBuffer buffer = ByteBuffer.allocate(size); offsets = buffer.asLongBuffer(); DataInputStream dis = null; @@ -47,6 +48,14 @@ public ShuffleIndexInformation(File indexFile) throws IOException { } } + /** + * Size of the index file + * @return size + */ + public int getSize() { + return size; + } + /** * Get index offset for a particular reducer. */ diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 715cfdcc8f4ef..e61f943af49f2 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -597,7 +597,9 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0", "Please use the new blacklisting options, spark.blacklist.*"), DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"), - DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more") + DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more"), + DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0", + "Not used any more. Please use spark.shuffle.service.index.cache.size") ) Map(configs.map { cfg => (cfg.key -> cfg) } : _*) diff --git a/docs/configuration.md b/docs/configuration.md index e7c0306920e08..6e9fe591b70a3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -627,10 +627,10 @@ Apart from these, the following properties are also available, and may be useful - spark.shuffle.service.index.cache.entries - 1024 + spark.shuffle.service.index.cache.size + 100m - Max number of entries to keep in the index cache of the shuffle service. + Cache entries limited to the specified memory footprint.