Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21501] Change CacheLoader to limit entries based on memory footprint #18940

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public int getInt(String name, int defaultValue) {
return conf.getInt(name, defaultValue);
}

public String get(String name, String defaultValue) {
return conf.get(name, defaultValue);
}

private String getConfKey(String suffix) {
return "spark." + module + "." + suffix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import com.google.common.collect.Maps;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
Expand Down Expand Up @@ -104,15 +105,21 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
Executor directoryCleaner) throws IOException {
this.conf = conf;
this.registeredExecutorFile = registeredExecutorFile;
int indexCacheEntries = conf.getInt("spark.shuffle.service.index.cache.entries", 1024);
String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's create a new config in internal/config, then we can use it by package$.MODULE$.SHUFFLE_INDEX_CACHE_SIZE().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

internal/config is in spark core not in the common/network code. we shouldn't be using the core version here as it will add an extra dependency that isn't needed and then will be required to ship in like the external shuffle jar, etc.
I prefer to leave it as is and if we want a config builder in the common network code we do that as a separate jira.

CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<File, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(File file) throws IOException {
return new ShuffleIndexInformation(file);
}
};
shuffleIndexCache = CacheBuilder.newBuilder()
.maximumSize(indexCacheEntries).build(indexCacheLoader);
.maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
.weigher(new Weigher<File, ShuffleIndexInformation>() {
public int weigh(File file, ShuffleIndexInformation indexInfo) {
return indexInfo.getSize();
}
})
.build(indexCacheLoader);
db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
if (db != null) {
executors = reloadRegisteredExecutors(db);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
public class ShuffleIndexInformation {
/** offsets as long buffer */
private final LongBuffer offsets;
private int size;

public ShuffleIndexInformation(File indexFile) throws IOException {
int size = (int)indexFile.length();
size = (int)indexFile.length();
ByteBuffer buffer = ByteBuffer.allocate(size);
offsets = buffer.asLongBuffer();
DataInputStream dis = null;
Expand All @@ -47,6 +48,14 @@ public ShuffleIndexInformation(File indexFile) throws IOException {
}
}

/**
* Size of the index file
* @return size
*/
public int getSize() {
return size;
}

/**
* Get index offset for a particular reducer.
*/
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,8 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
"Please use the new blacklisting options, spark.blacklist.*"),
DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"),
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more")
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more"),
DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0", "Not used any more")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to mention the new config that's replacing it in the warning message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 We better let user know what the alternative config is.

)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down
6 changes: 3 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -627,10 +627,10 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
<td><code>spark.shuffle.service.index.cache.entries</code></td>
<td>1024</td>
<td><code>spark.shuffle.service.index.cache.size</code></td>
<td>100m</td>
<td>
Max number of entries to keep in the index cache of the shuffle service.
Cache entries limited to the specified memory footprint.
</td>
</tr>
<tr>
Expand Down