From c77acf0bbc25341de2636649fdd76f9bb4bdf4ed Mon Sep 17 00:00:00 2001 From: Ye Zhou Date: Tue, 20 Jul 2021 00:03:30 -0500 Subject: [PATCH] [SPARK-35546][SHUFFLE] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better way ### What changes were proposed in this pull request? This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle. ### Summary of the change: When Executor registers with Shuffle Service, it will encode the merged shuffle dir created and also the application attemptId into the ShuffleManagerMeta into Json. Then in Shuffle Service, it will decode the Json string and get the correct merged shuffle dir and also the attemptId. If the registration comes from a newer attempt, the merged shuffle information will be updated to store the information from the newer attempt. This PR also refactored the management of the merged shuffle information to avoid concurrency issues. ### Why are the changes needed? Refer to the SPIP in SPARK-30602. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests. The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602. We have already verified the functionality and the improved performance as documented in the SPIP doc. Closes #33078 from zhouyejoe/SPARK-35546. Authored-by: Ye Zhou Signed-off-by: Mridul Muralidharan gmail.com> --- .../spark/network/util/TransportConf.java | 7 + .../shuffle/ExternalBlockStoreClient.java | 7 +- .../network/shuffle/OneForOneBlockPusher.java | 8 +- .../shuffle/RemoteBlockPushResolver.java | 546 +++++++++++------- .../shuffle/protocol/ExecutorShuffleInfo.java | 7 +- .../protocol/FinalizeShuffleMerge.java | 13 +- .../shuffle/protocol/PushBlockStream.java | 21 +- .../shuffle/ExternalBlockHandlerSuite.java | 2 +- .../shuffle/OneForOneBlockPusherSuite.java | 22 +- .../shuffle/RemoteBlockPushResolverSuite.java | 466 +++++++++++---- .../scala/org/apache/spark/SparkContext.scala | 1 + .../spark/internal/config/package.scala | 10 + .../apache/spark/storage/BlockManager.scala | 9 +- .../spark/storage/DiskBlockManager.scala | 34 +- .../scala/org/apache/spark/util/Utils.scala | 23 +- .../spark/storage/DiskBlockManagerSuite.scala | 23 +- .../org/apache/spark/util/UtilsSuite.scala | 2 +- 17 files changed, 810 insertions(+), 391 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index f051042a7adb4..8e7ecf500ed58 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -419,4 +419,11 @@ public long mergedIndexCacheSize() { public int ioExceptionsThresholdDuringMerge() { return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4); } + + /** + * The application attemptID assigned from Hadoop YARN. + */ + public int appAttemptId() { + return conf.getInt("spark.app.attempt.id", -1); + } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index f44140b124615..63bf7871956f6 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -141,8 +141,8 @@ public void pushBlocks( RetryingBlockFetcher.BlockFetchStarter blockPushStarter = (inputBlockId, inputListener) -> { TransportClient client = clientFactory.createClient(host, port); - new OneForOneBlockPusher(client, appId, inputBlockId, inputListener, buffersWithId) - .start(); + new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId, + inputListener, buffersWithId).start(); }; int maxRetries = conf.maxIORetries(); if (maxRetries > 0) { @@ -168,7 +168,8 @@ public void finalizeShuffleMerge( checkInit(); try { TransportClient client = clientFactory.createClient(host, port); - ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer(); + ByteBuffer finalizeShuffleMerge = + new FinalizeShuffleMerge(appId, conf.appAttemptId(), shuffleId).toByteBuffer(); client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java index 6ee95ef0dea01..b8b32e27551dd 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java @@ -45,6 +45,7 @@ public class OneForOneBlockPusher { private final TransportClient client; private final String appId; + private final int appAttemptId; private final String[] blockIds; private final BlockFetchingListener listener; private final Map buffers; @@ -52,11 +53,13 @@ public class OneForOneBlockPusher { public OneForOneBlockPusher( TransportClient client, String appId, + int appAttemptId, String[] blockIds, BlockFetchingListener listener, Map buffers) { this.client = client; this.appId = appId; + this.appAttemptId = appAttemptId; this.blockIds = blockIds; this.listener = listener; this.buffers = buffers; @@ -123,8 +126,9 @@ public void start() { throw new IllegalArgumentException( "Unexpected shuffle push block id format: " + blockIds[i]); } - ByteBuffer header = new PushBlockStream(appId, Integer.parseInt(blockIdParts[1]), - Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , i).toByteBuffer(); + ByteBuffer header = + new PushBlockStream(appId, appAttemptId, Integer.parseInt(blockIdParts[1]), + Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , i).toByteBuffer(); client.uploadStream(new NioManagedBuffer(header), buffers.get(blockIds[i]), new BlockPushCallback(i, blockIds[i])); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 47d25479d9b54..f88cfee105060 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -28,27 +28,26 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.Weigher; -import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,14 +72,22 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private static final Logger logger = LoggerFactory.getLogger(RemoteBlockPushResolver.class); - @VisibleForTesting - static final String MERGE_MANAGER_DIR = "merge_manager"; + public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged"; + public static final String SHUFFLE_META_DELIMITER = ":"; + public static final String MERGE_DIR_KEY = "mergeDir"; + public static final String ATTEMPT_ID_KEY = "attemptId"; + private static final int UNDEFINED_ATTEMPT_ID = -1; - private final ConcurrentMap appsPathInfo; - private final ConcurrentMap> partitions; + /** + * A concurrent hashmap where the key is the applicationId, and the value includes + * all the merged shuffle information for this application. AppShuffleInfo stores + * the application attemptId, merged shuffle local directories and the metadata + * for actively being merged shuffle partitions. + */ + private final ConcurrentMap appsShuffleInfo; - private final Executor directoryCleaner; + private final Executor mergedShuffleCleaner; private final TransportConf conf; private final int minChunkSize; private final int ioExceptionsThresholdDuringMerge; @@ -92,9 +99,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @SuppressWarnings("UnstableApiUsage") public RemoteBlockPushResolver(TransportConf conf) { this.conf = conf; - this.partitions = Maps.newConcurrentMap(); - this.appsPathInfo = Maps.newConcurrentMap(); - this.directoryCleaner = Executors.newSingleThreadExecutor( + this.appsShuffleInfo = new ConcurrentHashMap<>(); + this.mergedShuffleCleaner = Executors.newSingleThreadExecutor( // Add `spark` prefix because it will run in NM in Yarn mode. NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner")); this.minChunkSize = conf.minChunkSizeInMergedShuffleFile(); @@ -112,34 +118,59 @@ public ShuffleIndexInformation load(File file) throws IOException { this.errorHandler = new ErrorHandler.BlockPushErrorHandler(); } + @VisibleForTesting + protected AppShuffleInfo validateAndGetAppShuffleInfo(String appId) { + // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart + AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(appId); + Preconditions.checkArgument(appShuffleInfo != null, + "application " + appId + " is not registered or NM was restarted."); + return appShuffleInfo; + } + /** - * Given the appShuffleId and reduceId that uniquely identifies a given shuffle partition of an - * application, retrieves the associated metadata. If not present and the corresponding merged - * shuffle does not exist, initializes the metadata. + * Given the appShuffleInfo, shuffleId and reduceId that uniquely identifies a given shuffle + * partition of an application, retrieves the associated metadata. If not present and the + * corresponding merged shuffle does not exist, initializes the metadata. */ private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( - AppShuffleId appShuffleId, + AppShuffleInfo appShuffleInfo, + int shuffleId, int reduceId) { - File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId); - if (!partitions.containsKey(appShuffleId) && dataFile.exists()) { - // If this partition is already finalized then the partitions map will not contain - // the appShuffleId but the data file would exist. In that case the block is considered late. + File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, reduceId); + ConcurrentMap> partitions = + appShuffleInfo.partitions; + Map shufflePartitions = + partitions.compute(shuffleId, (id, map) -> { + if (map == null) { + // If this partition is already finalized then the partitions map will not contain the + // shuffleId but the data file would exist. In that case the block is considered late. + if (dataFile.exists()) { + return null; + } + return new ConcurrentHashMap<>(); + } else { + return map; + } + }); + if (shufflePartitions == null) { return null; } - Map shufflePartitions = - partitions.computeIfAbsent(appShuffleId, id -> Maps.newConcurrentMap()); + return shufflePartitions.computeIfAbsent(reduceId, key -> { // It only gets here when the key is not present in the map. This could either // be the first time the merge manager receives a pushed block for a given application // shuffle partition, or after the merged shuffle file is finalized. We handle these // two cases accordingly by checking if the file already exists. - File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId); - File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId); + File indexFile = + appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId); + File metaFile = + appShuffleInfo.getMergedShuffleMetaFile(shuffleId, reduceId); try { if (dataFile.exists()) { return null; } else { - return newAppShufflePartitionInfo(appShuffleId, reduceId, dataFile, indexFile, metaFile); + return newAppShufflePartitionInfo( + appShuffleInfo.appId, shuffleId, reduceId, dataFile, indexFile, metaFile); } } catch (IOException e) { logger.error( @@ -148,26 +179,28 @@ private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( indexFile.getAbsolutePath(), metaFile.getAbsolutePath()); throw new RuntimeException( String.format("Cannot initialize merged shuffle partition for appId %s shuffleId %s " - + "reduceId %s", appShuffleId.appId, appShuffleId.shuffleId, reduceId), e); + + "reduceId %s", appShuffleInfo.appId, shuffleId, reduceId), e); } }); } @VisibleForTesting AppShufflePartitionInfo newAppShufflePartitionInfo( - AppShuffleId appShuffleId, + String appId, + int shuffleId, int reduceId, File dataFile, File indexFile, File metaFile) throws IOException { - return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, + return new AppShufflePartitionInfo(appId, shuffleId, reduceId, dataFile, new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile)); } @Override public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int reduceId) { - AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId); - File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + File indexFile = + appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId); if (!indexFile.exists()) { throw new RuntimeException(String.format( "Merged shuffle index file %s not found", indexFile.getPath())); @@ -175,7 +208,7 @@ public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int reduc int size = (int) indexFile.length(); // First entry is the zero offset int numChunks = (size / Long.BYTES) - 1; - File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId); + File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, reduceId); if (!metaFile.exists()) { throw new RuntimeException(String.format("Merged shuffle meta file %s not found", metaFile.getPath())); @@ -190,13 +223,14 @@ public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int reduc @SuppressWarnings("UnstableApiUsage") @Override public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceId, int chunkId) { - AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId); - File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, reduceId); if (!dataFile.exists()) { throw new RuntimeException(String.format("Merged shuffle data file %s not found", dataFile.getPath())); } - File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId); + File indexFile = + appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId); try { // If we get here, the merged shuffle file should have been properly finalized. Thus we can // use the file length to determine the size of the merged shuffle block. @@ -210,76 +244,51 @@ public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceI } } - /** - * The logic here is consistent with - * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile( - * org.apache.spark.storage.BlockId, scala.Option)]] - */ - private File getFile(String appId, String filename) { - // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); - File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs, - appPathsInfo.subDirsPerLocalDir, filename); - logger.debug("Get merged file {}", targetFile.getAbsolutePath()); - return targetFile; - } - - private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int reduceId) { - String fileName = String.format("%s.data", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, fileName); - } - - private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int reduceId) { - String indexName = String.format("%s.index", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, indexName); - } - - private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int reduceId) { - String metaName = String.format("%s.meta", generateFileName(appShuffleId, reduceId)); - return getFile(appShuffleId.appId, metaName); - } - @Override public String[] getMergedBlockDirs(String appId) { - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId), - "application " + appId + " is not registered or NM was restarted."); - String[] activeLocalDirs = Preconditions.checkNotNull(appPathsInfo.activeLocalDirs, - "application " + appId - + " active local dirs list has not been updated by any executor registration"); - return activeLocalDirs; + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + return appShuffleInfo.appPathsInfo.activeLocalDirs; } @Override public void applicationRemoved(String appId, boolean cleanupLocalDirs) { logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs); - // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart - AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.remove(appId), - "application " + appId + " is not registered or NM was restarted."); - Iterator>> iterator = - partitions.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry> entry = iterator.next(); - AppShuffleId appShuffleId = entry.getKey(); - if (appId.equals(appShuffleId.appId)) { - iterator.remove(); - for (AppShufflePartitionInfo partitionInfo : entry.getValue().values()) { + AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId); + if (null != appShuffleInfo) { + mergedShuffleCleaner.execute( + () -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs)); + } + } + + + /** + * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo. + * If cleanupLocalDirs is true, the merged shuffle files will also be deleted. + * The cleanup will be executed in a separate thread. + */ + @VisibleForTesting + void closeAndDeletePartitionFilesIfNeeded( + AppShuffleInfo appShuffleInfo, + boolean cleanupLocalDirs) { + for (Map partitionMap : appShuffleInfo.partitions.values()) { + for (AppShufflePartitionInfo partitionInfo : partitionMap.values()) { + synchronized (partitionInfo) { partitionInfo.closeAllFiles(); } } } if (cleanupLocalDirs) { - Path[] dirs = Arrays.stream(appPathsInfo.activeLocalDirs) - .map(dir -> Paths.get(dir)).toArray(Path[]::new); - directoryCleaner.execute(() -> deleteExecutorDirs(dirs)); + deleteExecutorDirs(appShuffleInfo); } } /** - * Serially delete local dirs, executed in a separate thread. + * Serially delete local dirs. */ @VisibleForTesting - void deleteExecutorDirs(Path[] dirs) { + void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) { + Path[] dirs = Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs) + .map(dir -> Paths.get(dir)).toArray(Path[]::new); for (Path localDir : dirs) { try { if (Files.exists(localDir)) { @@ -294,10 +303,22 @@ void deleteExecutorDirs(Path[] dirs) { @Override public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) { + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId); + final String streamId = String.format("%s_%d_%d_%d", + OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, msg.mapIndex, + msg.reduceId); + if (appShuffleInfo.attemptId != msg.appAttemptId) { + // If this Block belongs to a former application attempt, it is considered late, + // as only the blocks from the current application attempt will be merged + // TODO: [SPARK-35548] Client should be updated to handle this error. + throw new IllegalArgumentException( + String.format("The attempt id %s in this PushBlockStream message does not match " + + "with the current attempt id %s stored in shuffle service for application %s", + msg.appAttemptId, appShuffleInfo.attemptId, msg.appId)); + } // Retrieve merged shuffle file metadata - AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId); AppShufflePartitionInfo partitionInfoBeforeCheck = - getOrCreateAppShufflePartitionInfo(appShuffleId, msg.reduceId); + getOrCreateAppShufflePartitionInfo(appShuffleInfo, msg.shuffleId, msg.reduceId); // Here partitionInfo will be null in 2 cases: // 1) The request is received for a block that has already been merged, this is possible due // to the retry logic. @@ -338,11 +359,9 @@ public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) { final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != null && partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null : partitionInfoBeforeCheck; - final String streamId = String.format("%s_%d_%d_%d", - OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, appShuffleId.shuffleId, msg.mapIndex, - msg.reduceId); if (partitionInfo != null) { - return new PushBlockStreamCallback(this, streamId, partitionInfo, msg.mapIndex); + return new PushBlockStreamCallback( + this, appShuffleInfo, streamId, partitionInfo, msg.mapIndex); } else { // For a duplicate block or a block which is late, respond back with a callback that handles // them differently. @@ -377,24 +396,31 @@ public void onFailure(String streamId, Throwable cause) { } } - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @Override public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException { - logger.info("Finalizing shuffle {} from Application {}.", msg.shuffleId, msg.appId); - AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId); - Map shufflePartitions = partitions.get(appShuffleId); + logger.info("Finalizing shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.appAttemptId); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId); + if (appShuffleInfo.attemptId != msg.appAttemptId) { + // If this Block belongs to a former application attempt, it is considered late, + // as only the blocks from the current application attempt will be merged + // TODO: [SPARK-35548] Client should be updated to handle this error. + throw new IllegalArgumentException( + String.format("The attempt id %s in this FinalizeShuffleMerge message does not match " + + "with the current attempt id %s stored in shuffle service for application %s", + msg.appAttemptId, appShuffleInfo.attemptId, msg.appId)); + } + Map shufflePartitions = + appShuffleInfo.partitions.remove(msg.shuffleId); MergeStatuses mergeStatuses; if (shufflePartitions == null || shufflePartitions.isEmpty()) { mergeStatuses = new MergeStatuses(msg.shuffleId, new RoaringBitmap[0], new int[0], new long[0]); } else { - Collection partitionsToFinalize = shufflePartitions.values(); - List bitmaps = new ArrayList<>(partitionsToFinalize.size()); - List reduceIds = new ArrayList<>(partitionsToFinalize.size()); - List sizes = new ArrayList<>(partitionsToFinalize.size()); - Iterator partitionsIter = partitionsToFinalize.iterator(); - while (partitionsIter.hasNext()) { - AppShufflePartitionInfo partition = partitionsIter.next(); + List bitmaps = new ArrayList<>(shufflePartitions.size()); + List reduceIds = new ArrayList<>(shufflePartitions.size()); + List sizes = new ArrayList<>(shufflePartitions.size()); + for (AppShufflePartitionInfo partition: shufflePartitions.values()) { synchronized (partition) { try { // This can throw IOException which will marks this shuffle partition as not merged. @@ -403,13 +429,10 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc reduceIds.add(partition.reduceId); sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { - logger.warn("Exception while finalizing shuffle partition {} {} {}", msg.appId, - msg.shuffleId, partition.reduceId, ioe); + logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId, + msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe); } finally { partition.closeAllFiles(); - // The partition should be removed after the files are written so that any new stream - // for the same reduce partition will see that the data file exists. - partitionsIter.remove(); } } } @@ -417,8 +440,8 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } - partitions.remove(appShuffleId); - logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId); + logger.info("Finalized shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.appAttemptId); return mergeStatuses; } @@ -426,15 +449,68 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { if (logger.isDebugEnabled()) { logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} " - + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs), - executorInfo.subDirsPerLocalDir); + + "num sub-dirs {} shuffleManager {}", appId, Arrays.toString(executorInfo.localDirs), + executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager); + } + String shuffleManagerMeta = executorInfo.shuffleManager; + if (shuffleManagerMeta.contains(SHUFFLE_META_DELIMITER)) { + String mergeDirInfo = + shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(SHUFFLE_META_DELIMITER) + 1); + try { + ObjectMapper mapper = new ObjectMapper(); + TypeReference> typeRef + = new TypeReference>(){}; + Map metaMap = mapper.readValue(mergeDirInfo, typeRef); + String mergeDir = metaMap.get(MERGE_DIR_KEY); + int attemptId = Integer.valueOf( + metaMap.getOrDefault(ATTEMPT_ID_KEY, String.valueOf(UNDEFINED_ATTEMPT_ID))); + if (mergeDir == null) { + throw new IllegalArgumentException( + String.format("Failed to get the merge directory information from the " + + "shuffleManagerMeta %s in executor registration message", shuffleManagerMeta)); + } + if (attemptId == UNDEFINED_ATTEMPT_ID) { + // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo. + // Only the first ExecutorRegister message can register the merge dirs + appsShuffleInfo.computeIfAbsent(appId, id -> + new AppShuffleInfo( + appId, UNDEFINED_ATTEMPT_ID, + new AppPathsInfo(appId, executorInfo.localDirs, + mergeDir, executorInfo.subDirsPerLocalDir) + )); + } else { + // If attemptId is not -1, there is attemptId stored in the ExecutorShuffleInfo. + // The first ExecutorRegister message from the same application attempt wil register + // the merge dirs in External Shuffle Service. Any later ExecutorRegister message + // from the same application attempt will not override the merge dirs. But it can + // be overridden by ExecutorRegister message from newer application attempt, + // and former attempts' shuffle partitions information will also be cleaned up. + AtomicReference originalAppShuffleInfo = new AtomicReference<>(); + appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> { + if (appShuffleInfo == null || attemptId > appShuffleInfo.attemptId) { + originalAppShuffleInfo.set(appShuffleInfo); + appShuffleInfo = + new AppShuffleInfo( + appId, attemptId, + new AppPathsInfo(appId, executorInfo.localDirs, + mergeDir, executorInfo.subDirsPerLocalDir)); + } + return appShuffleInfo; + }); + if (originalAppShuffleInfo.get() != null) { + AppShuffleInfo appShuffleInfo = originalAppShuffleInfo.get(); + logger.warn("Cleanup shuffle info and merged shuffle files for {}_{} as new " + + "application attempt registered", appId, appShuffleInfo.attemptId); + mergedShuffleCleaner.execute( + () -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, true)); + } + } + } catch (JsonProcessingException e) { + logger.warn("Failed to get the merge directory information from ExecutorShuffleInfo: ", e); + } + } else { + logger.warn("ExecutorShuffleInfo does not have the expected merge directory information"); } - appsPathInfo.computeIfAbsent(appId, id -> new AppPathsInfo(appId, executorInfo.localDirs, - executorInfo.subDirsPerLocalDir)); - } - private static String generateFileName(AppShuffleId appShuffleId, int reduceId) { - return String.format("%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX, appShuffleId.appId, - appShuffleId.shuffleId, reduceId); } /** @@ -443,6 +519,7 @@ private static String generateFileName(AppShuffleId appShuffleId, int reduceId) static class PushBlockStreamCallback implements StreamCallbackWithID { private final RemoteBlockPushResolver mergeManager; + private final AppShuffleInfo appShuffleInfo; private final String streamId; private final int mapIndex; private final AppShufflePartitionInfo partitionInfo; @@ -457,12 +534,17 @@ static class PushBlockStreamCallback implements StreamCallbackWithID { private PushBlockStreamCallback( RemoteBlockPushResolver mergeManager, + AppShuffleInfo appShuffleInfo, String streamId, AppShufflePartitionInfo partitionInfo, int mapIndex) { - this.mergeManager = Preconditions.checkNotNull(mergeManager); + Preconditions.checkArgument(mergeManager != null); + this.mergeManager = mergeManager; + Preconditions.checkArgument(appShuffleInfo != null); + this.appShuffleInfo = appShuffleInfo; this.streamId = streamId; - this.partitionInfo = Preconditions.checkNotNull(partitionInfo); + Preconditions.checkArgument(partitionInfo != null); + this.partitionInfo = partitionInfo; this.mapIndex = mapIndex; abortIfNecessary(); } @@ -482,7 +564,7 @@ private void writeBuf(ByteBuffer buf) throws IOException { while (buf.hasRemaining()) { long updatedPos = partitionInfo.getDataFilePos() + length; logger.debug("{} shuffleId {} reduceId {} current pos {} updated pos {}", - partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, + partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos); length += partitionInfo.dataChannel.write(buf, updatedPos); } @@ -567,7 +649,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { // memory, while still providing the necessary guarantee. synchronized (partitionInfo) { Map shufflePartitions = - mergeManager.partitions.get(partitionInfo.appShuffleId); + appShuffleInfo.partitions.get(partitionInfo.shuffleId); // If the partitionInfo corresponding to (appId, shuffleId, reduceId) is no longer present // then it means that the shuffle merge has already been finalized. We should thus ignore // the data and just drain the remaining bytes of this message. This check should be @@ -587,7 +669,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { } abortIfNecessary(); logger.trace("{} shuffleId {} reduceId {} onData writable", - partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, + partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.reduceId); if (partitionInfo.getCurrentMapIndex() < 0) { partitionInfo.setCurrentMapIndex(mapIndex); @@ -609,7 +691,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { } } else { logger.trace("{} shuffleId {} reduceId {} onData deferred", - partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, + partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.reduceId); // If we cannot write to disk, we buffer the current block chunk in memory so it could // potentially be written to disk later. We take our best effort without guarantee @@ -644,10 +726,10 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { public void onComplete(String streamId) throws IOException { synchronized (partitionInfo) { logger.trace("{} shuffleId {} reduceId {} onComplete invoked", - partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, + partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.reduceId); Map shufflePartitions = - mergeManager.partitions.get(partitionInfo.appShuffleId); + appShuffleInfo.partitions.get(partitionInfo.shuffleId); // When this request initially got to the server, the shuffle merge finalize request // was not received yet. By the time we finish reading this message, the shuffle merge // however is already finalized. We should thus respond RpcFailure to the client. @@ -724,10 +806,10 @@ public void onFailure(String streamId, Throwable throwable) throws IOException { if (isWriting) { synchronized (partitionInfo) { Map shufflePartitions = - mergeManager.partitions.get(partitionInfo.appShuffleId); + appShuffleInfo.partitions.get(partitionInfo.shuffleId); if (shufflePartitions != null && shufflePartitions.containsKey(partitionInfo.reduceId)) { logger.debug("{} shuffleId {} reduceId {} encountered failure", - partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, + partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.reduceId); partitionInfo.setCurrentMapIndex(-1); } @@ -742,63 +824,25 @@ AppShufflePartitionInfo getPartitionInfo() { } } - /** - * ID that uniquely identifies a shuffle for an application. This is used as a key in - * {@link #partitions}. - */ - public static class AppShuffleId { - public final String appId; - public final int shuffleId; - - AppShuffleId(String appId, int shuffleId) { - this.appId = appId; - this.shuffleId = shuffleId; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - AppShuffleId that = (AppShuffleId) o; - return shuffleId == that.shuffleId && Objects.equal(appId, that.appId); - } - - @Override - public int hashCode() { - return Objects.hashCode(appId, shuffleId); - } - - @Override - public String toString() { - return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) - .append("appId", appId) - .append("shuffleId", shuffleId) - .toString(); - } - } - /** Metadata tracked for an actively merged shuffle partition */ public static class AppShufflePartitionInfo { - private final AppShuffleId appShuffleId; + private final String appId; + private final int shuffleId; private final int reduceId; // The merged shuffle data file channel - public FileChannel dataChannel; + public final FileChannel dataChannel; + // The index file for a particular merged shuffle contains the chunk offsets. + private final MergeShuffleFile indexFile; + // The meta file for a particular merged shuffle contains all the map indices that belong to + // every chunk. The entry per chunk is a serialized bitmap. + private final MergeShuffleFile metaFile; // Location offset of the last successfully merged block for this shuffle partition private long dataFilePos; // Track the map index whose block is being merged for this shuffle partition private int currentMapIndex; // Bitmap tracking which mapper's blocks have been merged for this shuffle partition private RoaringBitmap mapTracker; - // The index file for a particular merged shuffle contains the chunk offsets. - private MergeShuffleFile indexFile; - // The meta file for a particular merged shuffle contains all the map indices that belong to - // every chunk. The entry per chunk is a serialized bitmap. - private MergeShuffleFile metaFile; // The offset for the last chunk tracked in the index file for this shuffle partition private long lastChunkOffset; private int lastMergedMapIndex = -1; @@ -808,12 +852,15 @@ public static class AppShufflePartitionInfo { private boolean indexMetaUpdateFailed; AppShufflePartitionInfo( - AppShuffleId appShuffleId, + String appId, + int shuffleId, int reduceId, File dataFile, MergeShuffleFile indexFile, MergeShuffleFile metaFile) throws IOException { - this.appShuffleId = Preconditions.checkNotNull(appShuffleId, "app shuffle id"); + Preconditions.checkArgument(appId != null, "app id is null"); + this.appId = appId; + this.shuffleId = shuffleId; this.reduceId = reduceId; this.dataChannel = new FileOutputStream(dataFile).getChannel(); this.indexFile = indexFile; @@ -831,8 +878,8 @@ public long getDataFilePos() { } public void setDataFilePos(long dataFilePos) { - logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", appShuffleId.appId, - appShuffleId.shuffleId, reduceId, this.dataFilePos, dataFilePos); + logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", appId, + shuffleId, reduceId, this.dataFilePos, dataFilePos); this.dataFilePos = dataFilePos; } @@ -842,7 +889,7 @@ int getCurrentMapIndex() { void setCurrentMapIndex(int mapIndex) { logger.trace("{} shuffleId {} reduceId {} updated mapIndex {} current mapIndex {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, currentMapIndex, mapIndex); + appId, shuffleId, reduceId, currentMapIndex, mapIndex); this.currentMapIndex = mapIndex; } @@ -851,8 +898,8 @@ long getLastChunkOffset() { } void blockMerged(int mapIndex) { - logger.debug("{} shuffleId {} reduceId {} updated merging mapIndex {}", appShuffleId.appId, - appShuffleId.shuffleId, reduceId, mapIndex); + logger.debug("{} shuffleId {} reduceId {} updated merging mapIndex {}", appId, + shuffleId, reduceId, mapIndex); mapTracker.add(mapIndex); chunkTracker.add(mapIndex); lastMergedMapIndex = mapIndex; @@ -871,7 +918,7 @@ void resetChunkTracker() { void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException { try { logger.trace("{} shuffleId {} reduceId {} index current {} updated {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, this.lastChunkOffset, chunkOffset); + appId, shuffleId, reduceId, this.lastChunkOffset, chunkOffset); if (indexMetaUpdateFailed) { indexFile.getChannel().position(indexFile.getPos()); } @@ -885,8 +932,8 @@ void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException { this.lastChunkOffset = chunkOffset; indexMetaUpdateFailed = false; } catch (IOException ioe) { - logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", appShuffleId.appId, - appShuffleId.shuffleId, reduceId); + logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", appId, + shuffleId, reduceId); indexMetaUpdateFailed = true; // Any exception here is propagated to the caller and the caller can decide whether to // abort or not. @@ -900,7 +947,7 @@ private void writeChunkTracker(int mapIndex) throws IOException { } chunkTracker.add(mapIndex); logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to meta file", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex); + appId, shuffleId, reduceId, mapIndex); if (indexMetaUpdateFailed) { metaFile.getChannel().position(metaFile.getPos()); } @@ -934,35 +981,25 @@ private void finalizePartition() throws IOException { } void closeAllFiles() { - if (dataChannel != null) { - try { + try { + if (dataChannel.isOpen()) { dataChannel.close(); - } catch (IOException ioe) { - logger.warn("Error closing data channel for {} shuffleId {} reduceId {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId); - } finally { - dataChannel = null; } + } catch (IOException ioe) { + logger.warn("Error closing data channel for {} shuffleId {} reduceId {}", + appId, shuffleId, reduceId); } - if (metaFile != null) { - try { - metaFile.close(); - } catch (IOException ioe) { - logger.warn("Error closing meta file for {} shuffleId {} reduceId {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId); - } finally { - metaFile = null; - } + try { + metaFile.close(); + } catch (IOException ioe) { + logger.warn("Error closing meta file for {} shuffleId {} reduceId {}", + appId, shuffleId, reduceId); } - if (indexFile != null) { - try { - indexFile.close(); - } catch (IOException ioe) { - logger.warn("Error closing index file for {} shuffleId {} reduceId {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId); - } finally { - indexFile = null; - } + try { + indexFile.close(); + } catch (IOException ioe) { + logger.warn("Error closing index file for {} shuffleId {} reduceId {}", + appId, shuffleId, reduceId); } } @@ -1003,14 +1040,16 @@ private static class AppPathsInfo { private AppPathsInfo( String appId, String[] localDirs, + String mergeDirectory, int subDirsPerLocalDir) { activeLocalDirs = Arrays.stream(localDirs) .map(localDir -> // Merge directory is created at the same level as block-manager directory. The list of - // local directories that we get from executorShuffleInfo are paths of each - // block-manager directory. To find out the merge directory location, we first find the - // parent dir and then append the "merger_manager" directory to it. - Paths.get(localDir).getParent().resolve(MERGE_MANAGER_DIR).toFile().getPath()) + // local directories that we get from ExecutorShuffleInfo are paths of each + // block-manager directory. The mergeDirectory is the merge directory name that we get + // from ExecutorShuffleInfo. To find out the merge directory location, we first find the + // parent dir of the block-manager directory and then append merge directory name to it. + Paths.get(localDir).getParent().resolve(mergeDirectory).toFile().getPath()) .toArray(String[]::new); this.subDirsPerLocalDir = subDirsPerLocalDir; if (logger.isInfoEnabled()) { @@ -1020,10 +1059,76 @@ private AppPathsInfo( } } + /** Merged Shuffle related information tracked for a specific application attempt */ + public static class AppShuffleInfo { + + private final String appId; + private final int attemptId; + private final AppPathsInfo appPathsInfo; + private final ConcurrentMap> partitions; + + AppShuffleInfo( + String appId, + int attemptId, + AppPathsInfo appPathsInfo) { + this.appId = appId; + this.attemptId = attemptId; + this.appPathsInfo = appPathsInfo; + partitions = new ConcurrentHashMap<>(); + } + + @VisibleForTesting + public ConcurrentMap> getPartitions() { + return partitions; + } + + /** + * The logic here is consistent with + * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile( + * org.apache.spark.storage.BlockId, scala.Option)]] + */ + private File getFile(String filename) { + // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart + File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs, + appPathsInfo.subDirsPerLocalDir, filename); + logger.debug("Get merged file {}", targetFile.getAbsolutePath()); + return targetFile; + } + + private String generateFileName( + String appId, + int shuffleId, + int reduceId) { + return String.format( + "%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX, appId, shuffleId, reduceId); + } + + public File getMergedShuffleDataFile( + int shuffleId, + int reduceId) { + String fileName = String.format("%s.data", generateFileName(appId, shuffleId, reduceId)); + return getFile(fileName); + } + + public File getMergedShuffleIndexFile( + int shuffleId, + int reduceId) { + String indexName = String.format("%s.index", generateFileName(appId, shuffleId, reduceId)); + return getFile(indexName); + } + + public File getMergedShuffleMetaFile( + int shuffleId, + int reduceId) { + String metaName = String.format("%s.meta", generateFileName(appId, shuffleId, reduceId)); + return getFile(metaName); + } + } + @VisibleForTesting static class MergeShuffleFile { - private FileChannel channel; - private DataOutputStream dos; + private final FileChannel channel; + private final DataOutputStream dos; private long pos; @VisibleForTesting @@ -1044,11 +1149,8 @@ private void updatePos(long numBytes) { } void close() throws IOException { - try { + if (channel.isOpen()) { dos.close(); - } finally { - dos = null; - channel = null; } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java index b4e7bc409d3b8..f123ccb663377 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java @@ -35,7 +35,12 @@ public class ExecutorShuffleInfo implements Encodable { public final String[] localDirs; /** Number of subdirectories created within each localDir. */ public final int subDirsPerLocalDir; - /** Shuffle manager (SortShuffleManager) that the executor is using. */ + /** + * Shuffle manager (SortShuffleManager) that the executor is using. + * If this string contains semicolon, it will also include the meta information + * for push based shuffle in JSON format. Example of the string with semicolon would be: + * SortShuffleManager:{"mergeDir": "mergeDirectory_1", "attemptId": 1} + */ public final String shuffleManager; @JsonCreator diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java index 31efbb727b20f..f6ab78b1ab7d4 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java @@ -32,12 +32,15 @@ */ public class FinalizeShuffleMerge extends BlockTransferMessage { public final String appId; + public final int appAttemptId; public final int shuffleId; public FinalizeShuffleMerge( String appId, + int appAttemptId, int shuffleId) { this.appId = appId; + this.appAttemptId = appAttemptId; this.shuffleId = shuffleId; } @@ -48,13 +51,14 @@ protected BlockTransferMessage.Type type() { @Override public int hashCode() { - return Objects.hashCode(appId, shuffleId); + return Objects.hashCode(appId, appAttemptId, shuffleId); } @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("appId", appId) + .append("attemptId", appAttemptId) .append("shuffleId", shuffleId) .toString(); } @@ -64,6 +68,7 @@ public boolean equals(Object other) { if (other != null && other instanceof FinalizeShuffleMerge) { FinalizeShuffleMerge o = (FinalizeShuffleMerge) other; return Objects.equal(appId, o.appId) + && appAttemptId == appAttemptId && shuffleId == o.shuffleId; } return false; @@ -71,18 +76,20 @@ public boolean equals(Object other) { @Override public int encodedLength() { - return Encoders.Strings.encodedLength(appId) + 4; + return Encoders.Strings.encodedLength(appId) + 4 + 4; } @Override public void encode(ByteBuf buf) { Encoders.Strings.encode(buf, appId); + buf.writeInt(appAttemptId); buf.writeInt(shuffleId); } public static FinalizeShuffleMerge decode(ByteBuf buf) { String appId = Encoders.Strings.decode(buf); + int attemptId = buf.readInt(); int shuffleId = buf.readInt(); - return new FinalizeShuffleMerge(appId, shuffleId); + return new FinalizeShuffleMerge(appId, attemptId, shuffleId); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java index 559f88fc4ea0e..d5e1cf2464d9e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java @@ -19,6 +19,7 @@ import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; + import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -34,6 +35,7 @@ */ public class PushBlockStream extends BlockTransferMessage { public final String appId; + public final int appAttemptId; public final int shuffleId; public final int mapIndex; public final int reduceId; @@ -41,8 +43,15 @@ public class PushBlockStream extends BlockTransferMessage { // blocks to be pushed. public final int index; - public PushBlockStream(String appId, int shuffleId, int mapIndex, int reduceId, int index) { + public PushBlockStream( + String appId, + int appAttemptId, + int shuffleId, + int mapIndex, + int reduceId, + int index) { this.appId = appId; + this.appAttemptId = appAttemptId; this.shuffleId = shuffleId; this.mapIndex = mapIndex; this.reduceId = reduceId; @@ -56,13 +65,14 @@ protected Type type() { @Override public int hashCode() { - return Objects.hashCode(appId, shuffleId, mapIndex , reduceId, index); + return Objects.hashCode(appId, appAttemptId, shuffleId, mapIndex , reduceId, index); } @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("appId", appId) + .append("attemptId", appAttemptId) .append("shuffleId", shuffleId) .append("mapIndex", mapIndex) .append("reduceId", reduceId) @@ -75,6 +85,7 @@ public boolean equals(Object other) { if (other != null && other instanceof PushBlockStream) { PushBlockStream o = (PushBlockStream) other; return Objects.equal(appId, o.appId) + && appAttemptId == o.appAttemptId && shuffleId == o.shuffleId && mapIndex == o.mapIndex && reduceId == o.reduceId @@ -85,12 +96,13 @@ public boolean equals(Object other) { @Override public int encodedLength() { - return Encoders.Strings.encodedLength(appId) + 16; + return Encoders.Strings.encodedLength(appId) + 4 + 4 + 4 + 4 + 4; } @Override public void encode(ByteBuf buf) { Encoders.Strings.encode(buf, appId); + buf.writeInt(appAttemptId); buf.writeInt(shuffleId); buf.writeInt(mapIndex); buf.writeInt(reduceId); @@ -99,10 +111,11 @@ public void encode(ByteBuf buf) { public static PushBlockStream decode(ByteBuf buf) { String appId = Encoders.Strings.decode(buf); + int attemptId = buf.readInt(); int shuffleId = buf.readInt(); int mapIdx = buf.readInt(); int reduceId = buf.readInt(); int index = buf.readInt(); - return new PushBlockStream(appId, shuffleId, mapIdx, reduceId, index); + return new PushBlockStream(appId, attemptId, shuffleId, mapIdx, reduceId, index); } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java index dc41e957f0fcd..00756b1b62887 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java @@ -243,7 +243,7 @@ public void testBadMessages() { public void testFinalizeShuffleMerge() throws IOException { RpcResponseCallback callback = mock(RpcResponseCallback.class); - FinalizeShuffleMerge req = new FinalizeShuffleMerge("app0", 0); + FinalizeShuffleMerge req = new FinalizeShuffleMerge("app0", 1, 0); RoaringBitmap bitmap = RoaringBitmap.bitmapOf(0, 1, 2); MergeStatuses statuses = new MergeStatuses(0, new RoaringBitmap[]{bitmap}, new int[]{3}, new long[]{30}); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java index 46a0f6cf420eb..e41198f8ae309 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java @@ -51,7 +51,7 @@ public void testPushOne() { BlockFetchingListener listener = pushBlocks( blocks, blockIds, - Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0))); + Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0))); verify(listener).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any()); } @@ -67,9 +67,9 @@ public void testPushThree() { BlockFetchingListener listener = pushBlocks( blocks, blockIds, - Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0), - new PushBlockStream("app-id", 0, 1, 0, 1), - new PushBlockStream("app-id", 0, 2, 0, 2))); + Arrays.asList(new PushBlockStream("app-id",0, 0, 0, 0, 0), + new PushBlockStream("app-id", 0, 0, 1, 0, 1), + new PushBlockStream("app-id", 0, 0, 2, 0, 2))); verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any()); verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_1_0"), any()); @@ -87,9 +87,9 @@ public void testServerFailures() { BlockFetchingListener listener = pushBlocks( blocks, blockIds, - Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0), - new PushBlockStream("app-id", 0, 1, 0, 1), - new PushBlockStream("app-id", 0, 2, 0, 2))); + Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0), + new PushBlockStream("app-id", 0, 0, 1, 0, 1), + new PushBlockStream("app-id", 0, 0, 2, 0, 2))); verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any()); verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), any()); @@ -107,9 +107,9 @@ public void testHandlingRetriableFailures() { BlockFetchingListener listener = pushBlocks( blocks, blockIds, - Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0), - new PushBlockStream("app-id", 0, 1, 0, 1), - new PushBlockStream("app-id", 0, 2, 0, 2))); + Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0), + new PushBlockStream("app-id", 0, 0, 1, 0, 1), + new PushBlockStream("app-id", 0, 0, 2, 0, 2))); verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any()); verify(listener, times(0)).onBlockFetchSuccess(not(eq("shufflePush_0_0_0")), any()); @@ -130,7 +130,7 @@ private static BlockFetchingListener pushBlocks( TransportClient client = mock(TransportClient.class); BlockFetchingListener listener = mock(BlockFetchingListener.class); OneForOneBlockPusher pusher = - new OneForOneBlockPusher(client, "app-id", blockIds, listener, blocks); + new OneForOneBlockPusher(client, "app-id", 0, blockIds, listener, blocks); Iterator> blockIterator = blocks.entrySet().iterator(); Iterator msgIterator = expectMessages.iterator(); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 565d433ff3203..2a73aa56b2d28 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -22,11 +22,13 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -61,6 +63,17 @@ public class RemoteBlockPushResolverSuite { private static final Logger log = LoggerFactory.getLogger(RemoteBlockPushResolverSuite.class); private final String TEST_APP = "testApp"; + private final String MERGE_DIRECTORY = "merge_manager"; + private final int NO_ATTEMPT_ID = -1; + private final int ATTEMPT_ID_1 = 1; + private final int ATTEMPT_ID_2 = 2; + private final String MERGE_DIRECTORY_META = "shuffleManager:{\"mergeDir\": \"merge_manager\"}"; + private final String MERGE_DIRECTORY_META_1 = + "shuffleManager:{\"mergeDir\": \"merge_manager_1\", \"attemptId\": \"1\"}"; + private final String MERGE_DIRECTORY_META_2 = + "shuffleManager:{\"mergeDir\": \"merge_manager_2\", \"attemptId\": \"2\"}"; + private final String INVALID_MERGE_DIRECTORY_META = + "shuffleManager:{\"mergeDirInvalid\": \"merge_manager_2\", \"attemptId\": \"2\"}"; private final String BLOCK_MANAGER_DIR = "blockmgr-193d8401"; private TransportConf conf; @@ -74,7 +87,7 @@ public void before() throws IOException { ImmutableMap.of("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "4")); conf = new TransportConf("shuffle", provider); pushResolver = new RemoteBlockPushResolver(conf); - registerExecutor(TEST_APP, prepareLocalDirs(localDirs)); + registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); } @After @@ -106,9 +119,9 @@ public void testBasicBlockMerge() throws IOException { new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[4])), new PushBlock(0, 1, 0, ByteBuffer.wrap(new byte[5])) }; - pushBlockHelper(TEST_APP, pushBlocks); + pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); validateMergeStatuses(statuses, new int[] {0}, new long[] {9}); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 5}, new int[][]{{0}, {1}}); @@ -122,9 +135,9 @@ public void testDividingMergedBlocksIntoChunks() throws IOException { new PushBlock(0, 2, 0, ByteBuffer.wrap(new byte[5])), new PushBlock(0, 3, 0, ByteBuffer.wrap(new byte[3])) }; - pushBlockHelper(TEST_APP, pushBlocks); + pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); validateMergeStatuses(statuses, new int[] {0}, new long[] {13}); MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, meta, new int[]{5, 5, 3}, new int[][]{{0, 1}, {2}, {3}}); @@ -138,9 +151,9 @@ public void testFinalizeWithMultipleReducePartitions() throws IOException { new PushBlock(0, 0, 1, ByteBuffer.wrap(new byte[5])), new PushBlock(0, 1, 1, ByteBuffer.wrap(new byte[3])) }; - pushBlockHelper(TEST_APP, pushBlocks); + pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); validateMergeStatuses(statuses, new int[] {0, 1}, new long[] {5, 8}); MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, meta, new int[]{5}, new int[][]{{0, 1}}); @@ -149,10 +162,12 @@ public void testFinalizeWithMultipleReducePartitions() throws IOException { @Test public void testDeferredBufsAreWrittenDuringOnData() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); // This should be deferred stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3])); // stream 1 now completes @@ -161,7 +176,7 @@ public void testDeferredBufsAreWrittenDuringOnData() throws IOException { // stream 2 has more data and then completes stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3])); stream2.onComplete(stream2.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 6}, new int[][]{{0}, {1}}); } @@ -169,10 +184,12 @@ public void testDeferredBufsAreWrittenDuringOnData() throws IOException { @Test public void testDeferredBufsAreWrittenDuringOnComplete() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); // This should be deferred stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3])); stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3])); @@ -181,7 +198,7 @@ public void testDeferredBufsAreWrittenDuringOnComplete() throws IOException { stream1.onComplete(stream1.getID()); // stream 2 now completes completes stream2.onComplete(stream2.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 6}, new int[][]{{0}, {1}}); } @@ -189,17 +206,19 @@ public void testDeferredBufsAreWrittenDuringOnComplete() throws IOException { @Test public void testDuplicateBlocksAreIgnoredWhenPrevStreamHasCompleted() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); stream1.onComplete(stream1.getID()); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); // This should be ignored stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2])); stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2])); stream2.onComplete(stream2.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}}); } @@ -207,10 +226,12 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamHasCompleted() throws IOE @Test public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); // This should be ignored stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2])); stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2])); @@ -219,7 +240,7 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE stream1.onComplete(stream1.getID()); // stream 2 now completes completes stream2.onComplete(stream2.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}}); } @@ -227,10 +248,11 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE @Test public void testFailureAfterData() throws IOException { StreamCallbackWithID stream = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); assertEquals("num-chunks", 0, blockMeta.getNumChunks()); } @@ -238,12 +260,13 @@ public void testFailureAfterData() throws IOException { @Test public void testFailureAfterMultipleDataBlocks() throws IOException { StreamCallbackWithID stream = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2])); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3])); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); assertEquals("num-chunks", 0, blockMeta.getNumChunks()); } @@ -251,39 +274,39 @@ public void testFailureAfterMultipleDataBlocks() throws IOException { @Test public void testFailureAfterComplete() throws IOException { StreamCallbackWithID stream = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2])); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3])); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onComplete(stream.getID()); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}}); } - @Test (expected = RuntimeException.class) - public void testTooLateArrival() throws IOException { + @Test(expected = RuntimeException.class) + public void testBlockReceivedAfterMergeFinalize() throws IOException { ByteBuffer[] blocks = new ByteBuffer[]{ ByteBuffer.wrap(new byte[4]), ByteBuffer.wrap(new byte[5]) }; StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); for (ByteBuffer block : blocks) { stream.onData(stream.getID(), block); } stream.onComplete(stream.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4])); try { stream1.onComplete(stream1.getID()); } catch (RuntimeException re) { assertEquals( - "Block shufflePush_0_1_0 received after merged shuffle is finalized", - re.getMessage()); + "Block shufflePush_0_1_0 received after merged shuffle is finalized", re.getMessage()); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}}); throw re; @@ -292,28 +315,31 @@ public void testTooLateArrival() throws IOException { @Test public void testIncompleteStreamsAreOverwritten() throws IOException { - registerExecutor(TEST_APP, prepareLocalDirs(localDirs)); + registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); byte[] expectedBytes = new byte[4]; ThreadLocalRandom.current().nextBytes(expectedBytes); StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); byte[] data = new byte[10]; ThreadLocalRandom.current().nextBytes(data); stream1.onData(stream1.getID(), ByteBuffer.wrap(data)); // There is a failure stream1.onFailure(stream1.getID(), new RuntimeException("forced error")); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); ByteBuffer nextBuf= ByteBuffer.wrap(expectedBytes, 0, 2); stream2.onData(stream2.getID(), nextBuf); stream2.onComplete(stream2.getID()); StreamCallbackWithID stream3 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0)); nextBuf = ByteBuffer.wrap(expectedBytes, 2, 2); stream3.onData(stream3.getID(), nextBuf); stream3.onComplete(stream3.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{1, 2}}); FileSegmentManagedBuffer mb = @@ -321,13 +347,15 @@ public void testIncompleteStreamsAreOverwritten() throws IOException { assertArrayEquals(expectedBytes, mb.nioByteBuffer().array()); } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testCollision() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); // This should be deferred stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5])); // Since stream2 didn't get any opportunity it will throw couldn't find opportunity error @@ -341,17 +369,20 @@ public void testCollision() throws IOException { } } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testFailureInAStreamDoesNotInterfereWithStreamWhichIsWriting() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); // There is a failure with stream2 stream2.onFailure(stream2.getID(), new RuntimeException("forced error")); StreamCallbackWithID stream3 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0)); // This should be deferred stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[5])); // Since this stream didn't get any opportunity it will throw couldn't find opportunity error @@ -368,7 +399,7 @@ public void testFailureInAStreamDoesNotInterfereWithStreamWhichIsWriting() throw stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); stream1.onComplete(stream1.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4}, new int[][] {{0}}); if (failedEx != null) { @@ -376,28 +407,83 @@ public void testFailureInAStreamDoesNotInterfereWithStreamWhichIsWriting() throw } } - @Test(expected = NullPointerException.class) + @Test(expected = IllegalArgumentException.class) public void testUpdateLocalDirsOnlyOnce() throws IOException { String testApp = "updateLocalDirsOnlyOnceTest"; Path[] activeLocalDirs = createLocalDirs(1); - registerExecutor(testApp, prepareLocalDirs(activeLocalDirs)); + registerExecutor(testApp, prepareLocalDirs(activeLocalDirs, MERGE_DIRECTORY), + MERGE_DIRECTORY_META); assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1); assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains( activeLocalDirs[0].toFile().getPath())); - // Any later executor register from the same application should not change the active local - // dirs list + // Any later executor register from the same application attempt should not change the active + // local dirs list Path[] updatedLocalDirs = localDirs; - registerExecutor(testApp, prepareLocalDirs(updatedLocalDirs)); + registerExecutor(testApp, prepareLocalDirs(updatedLocalDirs, MERGE_DIRECTORY), + MERGE_DIRECTORY_META); assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1); assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains( activeLocalDirs[0].toFile().getPath())); removeApplication(testApp); try { pushResolver.getMergedBlockDirs(testApp); - } catch (Throwable e) { - assertTrue(e.getMessage() - .startsWith("application " + testApp + " is not registered or NM was restarted.")); - Throwables.propagate(e); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), + "application " + testApp + " is not registered or NM was restarted."); + throw e; + } + } + + @Test(expected = IllegalArgumentException.class) + public void testExecutorRegisterWithInvalidJsonForPushShuffle() throws IOException { + String testApp = "executorRegisterWithInvalidShuffleManagerMeta"; + Path[] activeLocalDirs = createLocalDirs(1); + try { + registerExecutor(testApp, prepareLocalDirs(activeLocalDirs, MERGE_DIRECTORY), + INVALID_MERGE_DIRECTORY_META); + } catch (IllegalArgumentException re) { + assertEquals( + "Failed to get the merge directory information from the shuffleManagerMeta " + + "shuffleManager:{\"mergeDirInvalid\": \"merge_manager_2\", \"attemptId\": \"2\"} in " + + "executor registration message", re.getMessage()); + throw re; + } + } + + @Test(expected = IllegalArgumentException.class) + public void testExecutorRegistrationFromTwoAppAttempts() throws IOException { + String testApp = "testExecutorRegistrationFromTwoAppAttempts"; + Path[] attempt1LocalDirs = createLocalDirs(1); + registerExecutor(testApp, + prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1); + assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains( + attempt1LocalDirs[0].toFile().getPath())); + // Any later executor register from the same application attempt should not change the active + // local dirs list + Path[] attempt1UpdatedLocalDirs = localDirs; + registerExecutor(testApp, + prepareLocalDirs(attempt1UpdatedLocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1); + assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains( + attempt1LocalDirs[0].toFile().getPath())); + // But a new attempt from the same application can change the active local dirs list + Path[] attempt2LocalDirs = createLocalDirs(2); + registerExecutor(testApp, + prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), + MERGE_DIRECTORY_META_2); + assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 2); + assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains( + attempt2LocalDirs[0].toFile().getPath())); + removeApplication(testApp); + try { + pushResolver.getMergedBlockDirs(testApp); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), + "application " + testApp + " is not registered or NM was restarted."); + throw e; } } @@ -407,17 +493,18 @@ public void testCleanUpDirectory() throws IOException, InterruptedException { Semaphore deleted = new Semaphore(0); pushResolver = new RemoteBlockPushResolver(conf) { @Override - void deleteExecutorDirs(Path[] dirs) { - super.deleteExecutorDirs(dirs); + void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) { + super.deleteExecutorDirs(appShuffleInfo); deleted.release(); } }; + Path[] activeDirs = createLocalDirs(1); - registerExecutor(testApp, prepareLocalDirs(activeDirs)); + registerExecutor(testApp, prepareLocalDirs(activeDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); PushBlock[] pushBlocks = new PushBlock[] { new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[4]))}; - pushBlockHelper(testApp, pushBlocks); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 0)); + pushBlockHelper(testApp, NO_ATTEMPT_ID, pushBlocks); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(testApp, 0, 0); validateChunks(testApp, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}}); String[] mergeDirs = pushResolver.getMergedBlockDirs(testApp); @@ -435,7 +522,7 @@ public void testRecoverIndexFileAfterIOExceptions() throws IOException { useTestFiles(true, false); RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); callback1.onComplete(callback1.getID()); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); @@ -443,7 +530,7 @@ public void testRecoverIndexFileAfterIOExceptions() throws IOException { TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); testIndexFile.close(); StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any IOExceptions because number of IOExceptions are less than // the threshold but the update to index file will be unsuccessful. @@ -452,12 +539,12 @@ public void testRecoverIndexFileAfterIOExceptions() throws IOException { // Restore the index stream so it can write successfully again. testIndexFile.restore(); StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0)); callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2])); callback3.onComplete(callback3.getID()); assertEquals("index position", 24, testIndexFile.getPos()); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); validateMergeStatuses(statuses, new int[] {0}, new long[] {11}); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] {{0}, {1, 2}}); @@ -468,7 +555,7 @@ public void testRecoverIndexFileAfterIOExceptionsInFinalize() throws IOException useTestFiles(true, false); RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); callback1.onComplete(callback1.getID()); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); @@ -476,7 +563,7 @@ public void testRecoverIndexFileAfterIOExceptionsInFinalize() throws IOException TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); testIndexFile.close(); StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any IOExceptions because number of IOExceptions are less than // the threshold but the update to index file will be unsuccessful. @@ -486,7 +573,7 @@ public void testRecoverIndexFileAfterIOExceptionsInFinalize() throws IOException // Restore the index stream so it can write successfully again. testIndexFile.restore(); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); assertEquals("index position", 24, testIndexFile.getPos()); validateMergeStatuses(statuses, new int[] {0}, new long[] {9}); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); @@ -498,7 +585,7 @@ public void testRecoverMetaFileAfterIOExceptions() throws IOException { useTestFiles(false, true); RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); callback1.onComplete(callback1.getID()); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); @@ -507,7 +594,7 @@ public void testRecoverMetaFileAfterIOExceptions() throws IOException { long metaPosBeforeClose = testMetaFile.getPos(); testMetaFile.close(); StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any IOExceptions because number of IOExceptions are less than // the threshold but the update to index and meta file will be unsuccessful. @@ -517,13 +604,13 @@ public void testRecoverMetaFileAfterIOExceptions() throws IOException { // Restore the meta stream so it can write successfully again. testMetaFile.restore(); StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0)); callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2])); callback3.onComplete(callback3.getID()); assertEquals("index position", 24, partitionInfo.getIndexFile().getPos()); assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); validateMergeStatuses(statuses, new int[] {0}, new long[] {11}); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] {{0}, {1, 2}}); @@ -534,7 +621,7 @@ public void testRecoverMetaFileAfterIOExceptionsInFinalize() throws IOException useTestFiles(false, true); RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); callback1.onComplete(callback1.getID()); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); @@ -543,7 +630,7 @@ public void testRecoverMetaFileAfterIOExceptionsInFinalize() throws IOException long metaPosBeforeClose = testMetaFile.getPos(); testMetaFile.close(); StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any IOExceptions because number of IOExceptions are less than // the threshold but the update to index and meta file will be unsuccessful. @@ -554,7 +641,7 @@ public void testRecoverMetaFileAfterIOExceptionsInFinalize() throws IOException // Restore the meta stream so it can write successfully again. testMetaFile.restore(); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); assertEquals("index position", 24, indexFile.getPos()); assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose); validateMergeStatuses(statuses, new int[] {0}, new long[] {9}); @@ -562,11 +649,11 @@ public void testRecoverMetaFileAfterIOExceptionsInFinalize() throws IOException validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 5}, new int[][] {{0}, {1}}); } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testIOExceptionsExceededThreshold() throws IOException { RemoteBlockPushResolver.PushBlockStreamCallback callback = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); callback.onComplete(callback.getID()); @@ -575,7 +662,7 @@ public void testIOExceptionsExceededThreshold() throws IOException { for (int i = 1; i < 5; i++) { RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, i, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0)); try { callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[2])); } catch (IOException ioe) { @@ -588,7 +675,7 @@ public void testIOExceptionsExceededThreshold() throws IOException { try { RemoteBlockPushResolver.PushBlockStreamCallback callback2 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 5, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 5, 0, 0)); callback2.onData(callback.getID(), ByteBuffer.wrap(new byte[1])); } catch (Throwable t) { assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_5_0", @@ -597,12 +684,12 @@ public void testIOExceptionsExceededThreshold() throws IOException { } } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws IOException { useTestFiles(true, false); RemoteBlockPushResolver.PushBlockStreamCallback callback = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); callback.onComplete(callback.getID()); @@ -611,7 +698,7 @@ public void testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws IOE for (int i = 1; i < 5; i++) { RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, i, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0)); callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any exceptions but the exception count is increased. callback1.onComplete(callback1.getID()); @@ -622,7 +709,7 @@ public void testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws IOE try { RemoteBlockPushResolver.PushBlockStreamCallback callback2 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 5, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 5, 0, 0)); callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[4])); callback2.onComplete(callback2.getID()); } catch (Throwable t) { @@ -632,7 +719,7 @@ public void testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws IOE } } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testRequestForAbortedShufflePartitionThrowsException() { try { testIOExceptionsDuringMetaUpdateIncreasesExceptionCount(); @@ -641,7 +728,7 @@ public void testRequestForAbortedShufflePartitionThrowsException() { } try { pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 10, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 10, 0, 0)); } catch (Throwable t) { assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_10_0", t.getMessage()); @@ -649,19 +736,19 @@ public void testRequestForAbortedShufflePartitionThrowsException() { } } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testPendingBlockIsAbortedImmediately() throws IOException { useTestFiles(true, false); RemoteBlockPushResolver.PushBlockStreamCallback callback = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); testIndexFile.close(); for (int i = 1; i < 6; i++) { RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, i, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0)); try { callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any exceptions but the exception count is increased. @@ -682,19 +769,19 @@ public void testPendingBlockIsAbortedImmediately() throws IOException { } } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testWritingPendingBufsIsAbortedImmediatelyDuringComplete() throws IOException { useTestFiles(true, false); RemoteBlockPushResolver.PushBlockStreamCallback callback = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); testIndexFile.close(); for (int i = 1; i < 5; i++) { RemoteBlockPushResolver.PushBlockStreamCallback callback1 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, i, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0)); try { callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5])); // This will complete without any exceptions but the exception count is increased. @@ -706,7 +793,7 @@ public void testWritingPendingBufsIsAbortedImmediatelyDuringComplete() throws IO assertEquals(4, partitionInfo.getNumIOExceptions()); RemoteBlockPushResolver.PushBlockStreamCallback callback2 = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 5, 0, 0)); + new PushBlockStream(TEST_APP, 1, 0, 5, 0, 0)); callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); // This is deferred callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); @@ -738,10 +825,10 @@ public void testFailureWhileTruncatingFiles() throws IOException { new PushBlock(0, 0, 1, ByteBuffer.wrap(new byte[5])), new PushBlock(0, 1, 1, ByteBuffer.wrap(new byte[3])) }; - pushBlockHelper(TEST_APP, pushBlocks); + pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks); RemoteBlockPushResolver.PushBlockStreamCallback callback = (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0)); callback.onData(callback.getID(), ByteBuffer.wrap(new byte[2])); callback.onComplete(callback.getID()); RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); @@ -749,7 +836,7 @@ public void testFailureWhileTruncatingFiles() throws IOException { // Close the index file so truncate throws IOException testIndexFile.close(); MergeStatuses statuses = pushResolver.finalizeShuffleMerge( - new FinalizeShuffleMerge(TEST_APP, 0)); + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); validateMergeStatuses(statuses, new int[] {1}, new long[] {8}); MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 1); validateChunks(TEST_APP, 0, 1, meta, new int[]{5, 3}, new int[][]{{0},{1}}); @@ -758,46 +845,53 @@ public void testFailureWhileTruncatingFiles() throws IOException { @Test public void testOnFailureInvokedMoreThanOncePerBlock() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); stream1.onFailure(stream1.getID(), new RuntimeException("forced error")); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5])); // On failure on stream1 gets invoked again and should cause no interference stream1.onFailure(stream1.getID(), new RuntimeException("2nd forced error")); StreamCallbackWithID stream3 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 3, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 3, 0, 0)); // This should be deferred as stream 2 is still the active stream stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2])); // Stream 2 writes more and completes stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4])); stream2.onComplete(stream2.getID()); stream3.onComplete(stream3.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {9, 2}, new int[][] {{1},{3}}); removeApplication(TEST_APP); } - @Test (expected = RuntimeException.class) + @Test(expected = RuntimeException.class) public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws IOException { StreamCallbackWithID stream1 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); StreamCallbackWithID stream1Duplicate = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); stream1.onComplete(stream1.getID()); stream1Duplicate.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); StreamCallbackWithID stream2 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5])); // Should not change the current map id of the reduce partition stream1Duplicate.onFailure(stream2.getID(), new RuntimeException("forced error")); StreamCallbackWithID stream3 = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0)); // This should be deferred as stream 2 is still the active stream stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2])); RuntimeException failedEx = null; @@ -812,7 +906,7 @@ public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws // Stream 2 writes more and completes stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4])); stream2.onComplete(stream2.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {11}, new int[][] {{0, 1}}); removeApplication(TEST_APP); @@ -821,20 +915,165 @@ public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws } } + @Test(expected = IllegalArgumentException.class) + public void testPushBlockFromPreviousAttemptIsRejected() + throws IOException, InterruptedException { + Semaphore closed = new Semaphore(0); + pushResolver = new RemoteBlockPushResolver(conf) { + @Override + void closeAndDeletePartitionFilesIfNeeded( + AppShuffleInfo appShuffleInfo, + boolean cleanupLocalDirs) { + super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs); + closed.release(); + } + }; + String testApp = "testPushBlockFromPreviousAttemptIsRejected"; + Path[] attempt1LocalDirs = createLocalDirs(1); + registerExecutor(testApp, + prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + ByteBuffer[] blocks = new ByteBuffer[]{ + ByteBuffer.wrap(new byte[4]), + ByteBuffer.wrap(new byte[5]) + }; + StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 1, 0, 0, 0, 0)); + for (ByteBuffer block : blocks) { + stream1.onData(stream1.getID(), block); + } + stream1.onComplete(stream1.getID()); + RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo = + pushResolver.validateAndGetAppShuffleInfo(testApp); + Map> partitions = + appShuffleInfo.getPartitions(); + for (Map partitionMap : + partitions.values()) { + for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : partitionMap.values()) { + assertTrue(partitionInfo.getDataChannel().isOpen()); + assertTrue(partitionInfo.getMetaFile().getChannel().isOpen()); + assertTrue(partitionInfo.getIndexFile().getChannel().isOpen()); + } + } + Path[] attempt2LocalDirs = createLocalDirs(2); + registerExecutor(testApp, + prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), + MERGE_DIRECTORY_META_2); + StreamCallbackWithID stream2 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 2, 0, 1, 0, 0)); + for (ByteBuffer block : blocks) { + stream2.onData(stream2.getID(), block); + } + stream2.onComplete(stream2.getID()); + closed.acquire(); + // Check if all the file channels created for the first attempt are safely closed. + for (Map partitionMap : + partitions.values()) { + for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : partitionMap.values()) { + assertFalse(partitionInfo.getDataChannel().isOpen()); + assertFalse(partitionInfo.getMetaFile().getChannel().isOpen()); + assertFalse(partitionInfo.getIndexFile().getChannel().isOpen()); + } + } + try { + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 1, 0, 1, 0, 0)); + } catch (IllegalArgumentException re) { + assertEquals( + "The attempt id 1 in this PushBlockStream message does not match " + + "with the current attempt id 2 stored in shuffle service for application " + + testApp, re.getMessage()); + throw re; + } + } + + @Test(expected = IllegalArgumentException.class) + public void testFinalizeShuffleMergeFromPreviousAttemptIsAborted() + throws IOException, InterruptedException { + String testApp = "testFinalizeShuffleMergeFromPreviousAttemptIsAborted"; + Path[] attempt1LocalDirs = createLocalDirs(1); + registerExecutor(testApp, + prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + ByteBuffer[] blocks = new ByteBuffer[]{ + ByteBuffer.wrap(new byte[4]), + ByteBuffer.wrap(new byte[5]) + }; + StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 1, 0, 0, 0, 0)); + for (ByteBuffer block : blocks) { + stream1.onData(stream1.getID(), block); + } + stream1.onComplete(stream1.getID()); + Path[] attempt2LocalDirs = createLocalDirs(2); + registerExecutor(testApp, + prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), + MERGE_DIRECTORY_META_2); + try { + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, ATTEMPT_ID_1, 0)); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), + String.format("The attempt id %s in this FinalizeShuffleMerge message does not " + + "match with the current attempt id %s stored in shuffle service for application %s", + ATTEMPT_ID_1, ATTEMPT_ID_2, testApp)); + throw e; + } + } + + @Test(expected = ClosedChannelException.class) + public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted() + throws IOException, InterruptedException { + Semaphore closed = new Semaphore(0); + pushResolver = new RemoteBlockPushResolver(conf) { + @Override + void closeAndDeletePartitionFilesIfNeeded( + AppShuffleInfo appShuffleInfo, + boolean cleanupLocalDirs) { + super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs); + closed.release(); + } + }; + String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted"; + Path[] attempt1LocalDirs = createLocalDirs(1); + registerExecutor(testApp, + prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + ByteBuffer[] blocks = new ByteBuffer[]{ + ByteBuffer.wrap(new byte[4]), + ByteBuffer.wrap(new byte[5]), + ByteBuffer.wrap(new byte[6]), + ByteBuffer.wrap(new byte[7]) + }; + StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 1, 0, 0, 0, 0)); + // The onData callback should be called 4 times here before the onComplete callback. But a + // register executor message arrives in shuffle service after the 2nd onData callback. The 3rd + // onData callback should all throw ClosedChannelException as their channels are closed. + stream1.onData(stream1.getID(), blocks[0]); + stream1.onData(stream1.getID(), blocks[1]); + Path[] attempt2LocalDirs = createLocalDirs(2); + registerExecutor(testApp, + prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), + MERGE_DIRECTORY_META_2); + closed.acquire(); + // Should throw ClosedChannelException here. + stream1.onData(stream1.getID(), blocks[3]); + } + private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException { pushResolver = new RemoteBlockPushResolver(conf) { @Override - AppShufflePartitionInfo newAppShufflePartitionInfo(AppShuffleId appShuffleId, int reduceId, - File dataFile, File indexFile, File metaFile) throws IOException { + AppShufflePartitionInfo newAppShufflePartitionInfo(String appId, int shuffleId, + int reduceId, File dataFile, File indexFile, File metaFile) throws IOException { MergeShuffleFile mergedIndexFile = useTestIndexFile ? new TestMergeShuffleFile(indexFile) : new MergeShuffleFile(indexFile); MergeShuffleFile mergedMetaFile = useTestMetaFile ? new TestMergeShuffleFile(metaFile) : new MergeShuffleFile(metaFile); - return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, mergedIndexFile, + return new AppShufflePartitionInfo(appId, shuffleId, reduceId, dataFile, mergedIndexFile, mergedMetaFile); } }; - registerExecutor(TEST_APP, prepareLocalDirs(localDirs)); + registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); } private Path[] createLocalDirs(int numLocalDirs) throws IOException { @@ -846,16 +1085,15 @@ private Path[] createLocalDirs(int numLocalDirs) throws IOException { return localDirs; } - private void registerExecutor(String appId, String[] localDirs) throws IOException { - ExecutorShuffleInfo shuffleInfo = new ExecutorShuffleInfo(localDirs, 1, "mergedShuffle"); + private void registerExecutor(String appId, String[] localDirs, String shuffleManagerMeta) { + ExecutorShuffleInfo shuffleInfo = new ExecutorShuffleInfo(localDirs, 1, shuffleManagerMeta); pushResolver.registerExecutor(appId, shuffleInfo); } - private String[] prepareLocalDirs(Path[] localDirs) throws IOException { + private String[] prepareLocalDirs(Path[] localDirs, String mergeDir) throws IOException { String[] blockMgrDirs = new String[localDirs.length]; for (int i = 0; i< localDirs.length; i++) { - Files.createDirectories(localDirs[i].resolve( - RemoteBlockPushResolver.MERGE_MANAGER_DIR + File.separator + "00")); + Files.createDirectories(localDirs[i].resolve(mergeDir + File.separator + "00")); blockMgrDirs[i] = localDirs[i].toFile().getPath() + File.separator + BLOCK_MANAGER_DIR; } return blockMgrDirs; @@ -898,10 +1136,12 @@ private void validateChunks( private void pushBlockHelper( String appId, + int attemptId, PushBlock[] blocks) throws IOException { for (int i = 0; i < blocks.length; i++) { StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(appId, blocks[i].shuffleId, blocks[i].mapIndex, blocks[i].reduceId, 0)); + new PushBlockStream( + appId, attemptId, blocks[i].shuffleId, blocks[i].mapIndex, blocks[i].reduceId, 0)); stream.onData(stream.getID(), blocks[i].buffer); stream.onComplete(stream.getID()); } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ef47252189bbb..d11fa554ca8c7 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -583,6 +583,7 @@ class SparkContext(config: SparkConf) extends Logging { _applicationId = _taskScheduler.applicationId() _applicationAttemptId = _taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) + _applicationAttemptId.foreach(attemptId => _conf.set(APP_ATTEMPT_ID, attemptId)) if (_conf.get(UI_REVERSE_PROXY)) { val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") + "/proxy/" + _applicationId diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 3ef964fcb8fd9..39c526cb0e8b3 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2244,4 +2244,14 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val APP_ATTEMPT_ID = + ConfigBuilder("spark.app.attempt.id") + .internal() + .doc("The application attempt Id assigned from Hadoop YARN. " + + "When the application runs in cluster mode on YARN, there can be " + + "multiple attempts before failing the application") + .version("3.2.0") + .stringConf + .createOptional } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 8e96d441a6cdc..876ac54a4afb9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -537,10 +537,17 @@ private[spark] class BlockManager( private def registerWithExternalShuffleServer(): Unit = { logInfo("Registering executor with local external shuffle service.") + val shuffleManagerMeta = + if (Utils.isPushBasedShuffleEnabled(conf)) { + s"${shuffleManager.getClass.getName}:" + + s"${diskBlockManager.getMergeDirectoryAndAttemptIDJsonString()}}}" + } else { + shuffleManager.getClass.getName + } val shuffleConfig = new ExecutorShuffleInfo( diskBlockManager.localDirsString, diskBlockManager.subDirsPerLocalDir, - shuffleManager.getClass.getName) + shuffleManagerMeta) val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS) val SLEEP_TIME_SECS = 5 diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index d49f43f38048a..d92f686b41045 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -21,11 +21,18 @@ import java.io.{File, IOException} import java.nio.file.Files import java.util.UUID +import scala.collection.mutable.HashMap + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + import org.apache.spark.SparkConf import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.{config, Logging} import org.apache.spark.network.shuffle.ExecutorDiskUtils -import org.apache.spark.storage.DiskBlockManager.MERGE_MANAGER_DIR +import org.apache.spark.storage.DiskBlockManager.ATTEMPT_ID_KEY +import org.apache.spark.storage.DiskBlockManager.MERGE_DIR_KEY +import org.apache.spark.storage.DiskBlockManager.MERGE_DIRECTORY import org.apache.spark.util.{ShutdownHookManager, Utils} /** @@ -57,6 +64,10 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) + // Get merge directory name, append attemptId if there is any + private val mergeDirName = + s"$MERGE_DIRECTORY${conf.get(config.APP_ATTEMPT_ID).map(id => s"_$id").getOrElse("")}" + // Create merge directories createLocalDirsForMergedShuffleBlocks() @@ -200,12 +211,12 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo // Will create the merge_manager directory only if it doesn't exist under the local dir. Utils.getConfiguredLocalDirs(conf).foreach { rootDir => try { - val mergeDir = new File(rootDir, MERGE_MANAGER_DIR) + val mergeDir = new File(rootDir, mergeDirName) if (!mergeDir.exists()) { // This executor does not find merge_manager directory, it will try to create // the merge_manager directory and the sub directories. logDebug(s"Try to create $mergeDir and its sub dirs since the " + - s"$MERGE_MANAGER_DIR dir does not exist") + s"$mergeDirName dir does not exist") for (dirNum <- 0 until subDirsPerLocalDir) { val subDir = new File(mergeDir, "%02x".format(dirNum)) if (!subDir.exists()) { @@ -219,7 +230,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo } catch { case e: IOException => logError( - s"Failed to create $MERGE_MANAGER_DIR dir in $rootDir. Ignoring this directory.", e) + s"Failed to create $mergeDirName dir in $rootDir. Ignoring this directory.", e) } } } @@ -264,6 +275,17 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo } } + def getMergeDirectoryAndAttemptIDJsonString(): String = { + val mergedMetaMap: HashMap[String, String] = new HashMap[String, String]() + mergedMetaMap.put(MERGE_DIR_KEY, mergeDirName) + conf.get(config.APP_ATTEMPT_ID).foreach( + attemptId => mergedMetaMap.put(ATTEMPT_ID_KEY, attemptId)) + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + val jsonString = mapper.writeValueAsString(mergedMetaMap) + jsonString + } + private def addShutdownHook(): AnyRef = { logDebug("Adding shutdown hook") // force eager creation of logger ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () => @@ -303,5 +325,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo } private[spark] object DiskBlockManager { - private[spark] val MERGE_MANAGER_DIR = "merge_manager" + val MERGE_DIRECTORY = "merge_manager" + val MERGE_DIR_KEY = "mergeDir" + val ATTEMPT_ID_KEY = "attemptId" } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1a6a689e2e901..7ea96fea25a80 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2598,32 +2598,13 @@ private[spark] object Utils extends Logging { /** * Push based shuffle can only be enabled when the application is submitted - * to run in YARN mode, with external shuffle service enabled and - * spark.yarn.maxAttempts or the yarn cluster default max attempts is set to 1. - * TODO: Remove the requirement on spark.yarn.maxAttempts after SPARK-35546 - * Support push based shuffle with multiple app attempts + * to run in YARN mode, with external shuffle service enabled */ def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = { conf.get(PUSH_BASED_SHUFFLE_ENABLED) && (conf.get(IS_TESTING).getOrElse(false) || (conf.get(SHUFFLE_SERVICE_ENABLED) && - conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn" && - getYarnMaxAttempts(conf) == 1)) - } - - /** - * Returns the maximum number of attempts to register the AM in YARN mode. - * TODO: Remove this method after SPARK-35546 Support push based shuffle - * with multiple app attempts - */ - def getYarnMaxAttempts(conf: SparkConf): Int = { - val sparkMaxAttempts = conf.getOption("spark.yarn.maxAttempts").map(_.toInt) - val yarnMaxAttempts = getSparkOrYarnConfig(conf, YarnConfiguration.RM_AM_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS.toString).toInt - sparkMaxAttempts match { - case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts - case None => yarnMaxAttempts - } + conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn")) } /** diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 6397c96f36d3b..0443c40bce3a2 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -20,7 +20,10 @@ package org.apache.spark.storage import java.io.{File, FileWriter} import java.nio.file.{Files, Paths} import java.nio.file.attribute.PosixFilePermissions +import java.util.HashMap +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.databind.ObjectMapper import org.apache.commons.io.FileUtils import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -91,11 +94,11 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B } test("should still create merge directories if one already exists under a local dir") { - val mergeDir0 = new File(rootDir0, DiskBlockManager.MERGE_MANAGER_DIR) + val mergeDir0 = new File(rootDir0, DiskBlockManager.MERGE_DIRECTORY) if (!mergeDir0.exists()) { Files.createDirectories(mergeDir0.toPath) } - val mergeDir1 = new File(rootDir1, DiskBlockManager.MERGE_MANAGER_DIR) + val mergeDir1 = new File(rootDir1, DiskBlockManager.MERGE_DIRECTORY) if (mergeDir1.exists()) { Utils.deleteRecursively(mergeDir1) } @@ -104,7 +107,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B testConf.set(config.Tests.IS_TESTING, true) diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true) assert(Utils.getConfiguredLocalDirs(testConf).map( - rootDir => new File(rootDir, DiskBlockManager.MERGE_MANAGER_DIR)) + rootDir => new File(rootDir, DiskBlockManager.MERGE_DIRECTORY)) .filter(mergeDir => mergeDir.exists()).length === 2) // mergeDir0 will be skipped as it already exists assert(mergeDir0.list().length === 0) @@ -124,6 +127,20 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B FileUtils.deleteQuietly(testDir) } + test("Encode merged directory name and attemptId in shuffleManager field") { + testConf.set(config.APP_ATTEMPT_ID, "1"); + diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true) + val mergedShuffleMeta = diskBlockManager.getMergeDirectoryAndAttemptIDJsonString(); + val mapper: ObjectMapper = new ObjectMapper + val typeRef: TypeReference[HashMap[String, String]] = + new TypeReference[HashMap[String, String]]() {} + val metaMap: HashMap[String, String] = mapper.readValue(mergedShuffleMeta, typeRef) + val mergeDir = metaMap.get(DiskBlockManager.MERGE_DIR_KEY) + assert(mergeDir.equals(DiskBlockManager.MERGE_DIRECTORY + "_1")) + val attemptId = metaMap.get(DiskBlockManager.ATTEMPT_ID_KEY) + assert(attemptId.equals("1")) + } + def writeToFile(file: File, numBytes: Int): Unit = { val writer = new FileWriter(file, true) for (i <- 0 until numBytes) writer.write(i) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index f34b1c0f8d108..677efecb35f08 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1512,7 +1512,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { conf.set("spark.yarn.maxAttempts", "1") assert(Utils.isPushBasedShuffleEnabled(conf) === true) conf.set("spark.yarn.maxAttempts", "2") - assert(Utils.isPushBasedShuffleEnabled(conf) === false) + assert(Utils.isPushBasedShuffleEnabled(conf) === true) } }