Skip to content

Commit

Permalink
[SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to su…
Browse files Browse the repository at this point in the history
…pport push shuffle blocks

### What changes were proposed in this pull request?

This is the first patch for SPIP SPARK-30602 for push-based shuffle.
Summary of changes:
* Introduce new API in ExternalBlockStoreClient to push blocks to a remote shuffle service.
* Leveraging the streaming upload functionality in SPARK-6237, it also enables the ExternalBlockHandler to delegate the handling of block push requests to MergedShuffleFileManager.
* Propose the API for MergedShuffleFileManager, where the core logic on the shuffle service side to handle block push requests is defined. The actual implementation of this API is deferred into a later RB to restrict the size of this PR.
* Introduce OneForOneBlockPusher to enable pushing blocks to remote shuffle services in shuffle RPC layer.
* New protocols in shuffle RPC layer to support the functionalities.

### Why are the changes needed?

Refer to the SPIP in SPARK-30602

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.

Lead-authored-by: Min Shen <mshenlinkedin.com>
Co-authored-by: Chandni Singh <chsinghlinkedin.com>
Co-authored-by: Ye Zhou <yezhoulinkedin.com>

Closes #29855 from Victsm/SPARK-32915.

Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Chandni Singh <chsingh@linkedin.com>
Co-authored-by: Ye Zhou <yezhou@linkedin.com>
Co-authored-by: Chandni Singh <singh.chandni@gmail.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
5 people authored and Mridul Muralidharan committed Oct 15, 2020
1 parent b089fe5 commit 82eea13
Show file tree
Hide file tree
Showing 21 changed files with 1,212 additions and 15 deletions.
4 changes: 4 additions & 0 deletions common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.network.protocol;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import io.netty.buffer.ByteBuf;
import org.roaringbitmap.RoaringBitmap;

/** Provides a canonical set of Encoders for simple types. */
public class Encoders {
Expand All @@ -44,6 +46,40 @@ public static String decode(ByteBuf buf) {
}
}

/** Bitmaps are encoded with their serialization length followed by the serialization bytes. */
public static class Bitmaps {
public static int encodedLength(RoaringBitmap b) {
// Compress the bitmap before serializing it. Note that since BlockTransferMessage
// needs to invoke encodedLength first to figure out the length for the ByteBuf, it
// guarantees that the bitmap will always be compressed before being serialized.
b.trim();
b.runOptimize();
return b.serializedSizeInBytes();
}

public static void encode(ByteBuf buf, RoaringBitmap b) {
int encodedLength = b.serializedSizeInBytes();
// RoaringBitmap requires nio ByteBuffer for serde. We expose the netty ByteBuf as a nio
// ByteBuffer. Here, we need to explicitly manage the index so we can write into the
// ByteBuffer, and the write is reflected in the underneath ByteBuf.
b.serialize(buf.nioBuffer(buf.writerIndex(), encodedLength));
buf.writerIndex(buf.writerIndex() + encodedLength);
}

public static RoaringBitmap decode(ByteBuf buf) {
RoaringBitmap bitmap = new RoaringBitmap();
try {
bitmap.deserialize(buf.nioBuffer());
// RoaringBitmap deserialize does not advance the reader index of the underlying ByteBuf.
// Manually update the index here.
buf.readerIndex(buf.readerIndex() + bitmap.serializedSizeInBytes());
} catch (IOException e) {
throw new RuntimeException("Exception while decoding bitmap", e);
}
return bitmap;
}
}

/** Byte arrays are encoded with their length followed by bytes. */
public static class ByteArrays {
public static int encodedLength(byte[] arr) {
Expand Down Expand Up @@ -135,4 +171,31 @@ public static long[] decode(ByteBuf buf) {
return longs;
}
}

/** Bitmap arrays are encoded with the number of bitmaps followed by per-Bitmap encoding. */
public static class BitmapArrays {
public static int encodedLength(RoaringBitmap[] bitmaps) {
int totalLength = 4;
for (RoaringBitmap b : bitmaps) {
totalLength += Bitmaps.encodedLength(b);
}
return totalLength;
}

public static void encode(ByteBuf buf, RoaringBitmap[] bitmaps) {
buf.writeInt(bitmaps.length);
for (RoaringBitmap b : bitmaps) {
Bitmaps.encode(buf, b);
}
}

public static RoaringBitmap[] decode(ByteBuf buf) {
int numBitmaps = buf.readInt();
RoaringBitmap[] bitmaps = new RoaringBitmap[numBitmaps];
for (int i = 0; i < bitmaps.length; i ++) {
bitmaps[i] = Bitmaps.decode(buf);
}
return bitmaps;
}
}
}
9 changes: 9 additions & 0 deletions common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down Expand Up @@ -93,6 +97,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
Expand Down Expand Up @@ -135,4 +136,24 @@ public void onFailure(Throwable t) {
hostLocalDirsCompletable.completeExceptionally(e);
}
}

/**
* Push a sequence of shuffle blocks in a best-effort manner to a remote node asynchronously.
* These shuffle blocks, along with blocks pushed by other clients, will be merged into
* per-shuffle partition merged shuffle files on the destination node.
*
* @param host the host of the remote node.
* @param port the port of the remote node.
* @param blockIds block ids to be pushed
* @param buffers buffers to be pushed
* @param listener the listener to receive block push status.
*/
public void pushBlocks(
String host,
int port,
String[] blockIds,
ManagedBuffer[] buffers,
BlockFetchingListener listener) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.net.ConnectException;

import com.google.common.base.Throwables;

/**
* Plugs into {@link RetryingBlockFetcher} to further control when an exception should be retried
* and logged.
* Note: {@link RetryingBlockFetcher} will delegate the exception to this handler only when
* - remaining retries < max retries
* - exception is an IOException
*/

public interface ErrorHandler {

boolean shouldRetryError(Throwable t);

default boolean shouldLogError(Throwable t) {
return true;
}

/**
* A no-op error handler instance.
*/
ErrorHandler NOOP_ERROR_HANDLER = t -> true;

/**
* The error handler for pushing shuffle blocks to remote shuffle services.
*/
class BlockPushErrorHandler implements ErrorHandler {
/**
* String constant used for generating exception messages indicating a block to be merged
* arrives too late on the server side, and also for later checking such exceptions on the
* client side. When we get a block push failure because of the block arrives too late, we
* will not retry pushing the block nor log the exception on the client side.
*/
public static final String TOO_LATE_MESSAGE_SUFFIX =
"received after merged shuffle is finalized";

/**
* String constant used for generating exception messages indicating the server couldn't
* append a block after all available attempts due to collision with other blocks belonging
* to the same shuffle partition, and also for later checking such exceptions on the client
* side. When we get a block push failure because of the block couldn't be written due to
* this reason, we will not log the exception on the client side.
*/
public static final String BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX =
"Couldn't find an opportunity to write block";

@Override
public boolean shouldRetryError(Throwable t) {
// If it is a connection time out or a connection closed exception, no need to retry.
if (t.getCause() != null && t.getCause() instanceof ConnectException) {
return false;
}
// If the block is too late, there is no need to retry it
return !Throwables.getStackTraceAsString(t).contains(TOO_LATE_MESSAGE_SUFFIX);
}

@Override
public boolean shouldLogError(Throwable t) {
String errorStackTrace = Throwables.getStackTraceAsString(t);
return !errorStackTrace.contains(BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX) &&
!errorStackTrace.contains(TOO_LATE_MESSAGE_SUFFIX);
}
}
}
Loading

0 comments on commit 82eea13

Please sign in to comment.