Skip to content

Commit

Permalink
[SPARK-29469][SHUFFLE] Avoid retries by RetryingBlockFetcher when Ext…
Browse files Browse the repository at this point in the history
…ernalBlockStoreClient is closed

### What changes were proposed in this pull request?

When ExternalBlockStoreClient was closed, retries from RetryingBlockFetcher will cause NPE. This proposes to skip retries by RetryingBlockFetcher when ExternalBlockStoreClient is closed.

### Why are the changes needed?

When ExternalBlockStoreClient was closed, retries from RetryingBlockFetcher will cause NPE:

```
2019-10-14 20:06:16 ERROR RetryingBlockFetcher:143 - Exception while beginning fetch of 2 outstanding blocks (after 3 retries)
java.lang.NullPointerException
at org.apache.spark.network.shuffle.ExternalShuffleClient.lambda$fetchBlocks$0(ExternalShuffleClient.java:100)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.lambda$initiateRetry$0(RetryingBlockFetcher.java:169)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
```

It was happened after BlockManager and ExternalBlockStoreClient was closed due to previous errors. In this cases, RetryingBlockFetcher does not need to retry. This NPE is harmless for job execution, but is a source of misleading when looking at log. Especially for end-users.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #26115 from viirya/SPARK-29469.

Lead-authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
2 people authored and cloud-fan committed Oct 16, 2019
1 parent e00344e commit 93e71e6
Showing 1 changed file with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
private final SecretKeyHolder secretKeyHolder;
private final long registrationTimeoutMs;

protected TransportClientFactory clientFactory;
protected volatile TransportClientFactory clientFactory;
protected String appId;

/**
Expand Down Expand Up @@ -102,9 +102,14 @@ public void fetchBlocks(
try {
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
(blockIds1, listener1) -> {
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockFetcher(client, appId, execId,
blockIds1, listener1, conf, downloadFileManager).start();
// Unless this client is closed.
if (clientFactory != null) {
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockFetcher(client, appId, execId,
blockIds1, listener1, conf, downloadFileManager).start();
} else {
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
}
};

int maxRetries = conf.maxIORetries();
Expand Down

0 comments on commit 93e71e6

Please sign in to comment.