diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index b8e52c8621fb6..85d278138c2b4 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -53,7 +53,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient { private final SecretKeyHolder secretKeyHolder; private final long registrationTimeoutMs; - protected TransportClientFactory clientFactory; + protected volatile TransportClientFactory clientFactory; protected String appId; /** @@ -102,9 +102,14 @@ public void fetchBlocks( try { RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = (blockIds1, listener1) -> { - TransportClient client = clientFactory.createClient(host, port); - new OneForOneBlockFetcher(client, appId, execId, - blockIds1, listener1, conf, downloadFileManager).start(); + // Unless this client is closed. + if (clientFactory != null) { + TransportClient client = clientFactory.createClient(host, port); + new OneForOneBlockFetcher(client, appId, execId, + blockIds1, listener1, conf, downloadFileManager).start(); + } else { + logger.info("This clientFactory was closed. Skipping further block fetch retries."); + } }; int maxRetries = conf.maxIORetries();