Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks #29855

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f144936
LIHADOOP-48527 Magnet shuffle service block transfer netty protocol
Victsm May 9, 2020
3bb084a
LIHADOOP-53388 Magnet: Fix a bug with calculating bitmap encoded length
zhouyejoe May 11, 2020
0781541
LIHADOOP-53438 Using different appId for the tests in RemoteBlockPush…
otterc May 12, 2020
7adf227
LIHADOOP-53496 Not logging all block push exceptions on the client
otterc May 15, 2020
ee75ee9
LIHADOOP-53700 Separate configuration for caching the merged index fi…
zhouyejoe Jun 1, 2020
d1f36c0
LIHADOOP-53940 Logging the data file and index file path when shuffle…
otterc Jun 10, 2020
aa124b4
LIHADOOP-54059 LIHADOOP-53496 Handle the inconsistencies between loc…
otterc Jun 15, 2020
3f1fb0c
LIHADOOP-54379 Sorting the disks both on shuffle service and executors
otterc Jun 24, 2020
bfcb070
LIHADOOP-54370 Not to retry on certain exceptions when pushing blocks
otterc Jun 24, 2020
415a7d5
LIHADOOP-52494 Magnet fallback to origin shuffle blocks when fetch of…
otterc Jul 24, 2020
4381ff3
LIHADOOP-55372 reduced the default for minChunkSizeInMergedShuffleFile
otterc Aug 26, 2020
dd9958b
LIHADOOP-55315 Avoid network when fetching merged shuffle file in loc…
zhouyejoe Sep 9, 2020
021dea4
LIHADOOP-55654 Duplicate application init calls trigger NPE and wrong…
zhouyejoe Sep 12, 2020
5ce02d3
Prune changes that should go into a later PR.
Victsm Sep 23, 2020
90d6329
Further prune changes that should go into a later PR.
Victsm Sep 23, 2020
2bdf800
Fix review comments.
Victsm Sep 23, 2020
3e9e9e1
Fix unit test failure.
Victsm Sep 23, 2020
eb366d4
Address review comments.
Victsm Sep 25, 2020
7a6ab15
Add more comment.
Victsm Sep 25, 2020
85b0de8
Fix style issue.
Victsm Sep 25, 2020
7b48c50
Add more clarifying comments.
Victsm Sep 29, 2020
9f00cc3
Further optimize the serde of bitmap
Victsm Oct 1, 2020
db36f3f
Update common/network-shuffle/src/test/java/org/apache/spark/network/…
Victsm Oct 2, 2020
e604686
Address additional review comments
Victsm Oct 10, 2020
b45b190
Additional cleanups.
Victsm Oct 10, 2020
f11eb9b
Fix additional review comments.
Victsm Oct 11, 2020
691635e
Fix additional review comments
Victsm Oct 12, 2020
f016b39
Fix additional review comments.
Victsm Oct 12, 2020
2c95f18
Fix styling issue
Victsm Oct 13, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.network.protocol;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import io.netty.buffer.ByteBuf;
import org.roaringbitmap.RoaringBitmap;

/** Provides a canonical set of Encoders for simple types. */
public class Encoders {
Expand All @@ -44,6 +46,40 @@ public static String decode(ByteBuf buf) {
}
}

/** Bitmaps are encoded with their serialization length followed by the serialization bytes. */
public static class Bitmaps {
public static int encodedLength(RoaringBitmap b) {
// Compress the bitmap before serializing it. Note that since BlockTransferMessage
// needs to invoke encodedLength first to figure out the length for the ByteBuf, it
// guarantees that the bitmap will always be compressed before being serialized.
b.trim();
b.runOptimize();
return b.serializedSizeInBytes();
}

public static void encode(ByteBuf buf, RoaringBitmap b) {
int encodedLength = b.serializedSizeInBytes();
// RoaringBitmap requires nio ByteBuffer for serde. We expose the netty ByteBuf as a nio
// ByteBuffer. Here, we need to explicitly manage the index so we can write into the
// ByteBuffer, and the write is reflected in the underneath ByteBuf.
b.serialize(buf.nioBuffer(buf.writerIndex(), encodedLength));
buf.writerIndex(buf.writerIndex() + encodedLength);
}

public static RoaringBitmap decode(ByteBuf buf) {
RoaringBitmap bitmap = new RoaringBitmap();
try {
bitmap.deserialize(buf.nioBuffer());
// RoaringBitmap deserialize does not advance the reader index of the underlying ByteBuf.
// Manually update the index here.
buf.readerIndex(buf.readerIndex() + bitmap.serializedSizeInBytes());
} catch (IOException e) {
throw new RuntimeException("Exception while decoding bitmap", e);
}
return bitmap;
}
}

/** Byte arrays are encoded with their length followed by bytes. */
public static class ByteArrays {
public static int encodedLength(byte[] arr) {
Expand Down Expand Up @@ -135,4 +171,31 @@ public static long[] decode(ByteBuf buf) {
return longs;
}
}

/** Bitmap arrays are encoded with the number of bitmaps followed by per-Bitmap encoding. */
public static class BitmapArrays {
public static int encodedLength(RoaringBitmap[] bitmaps) {
int totalLength = 4;
for (RoaringBitmap b : bitmaps) {
totalLength += Bitmaps.encodedLength(b);
}
return totalLength;
}

public static void encode(ByteBuf buf, RoaringBitmap[] bitmaps) {
buf.writeInt(bitmaps.length);
for (RoaringBitmap b : bitmaps) {
Bitmaps.encode(buf, b);
}
}

public static RoaringBitmap[] decode(ByteBuf buf) {
int numBitmaps = buf.readInt();
RoaringBitmap[] bitmaps = new RoaringBitmap[numBitmaps];
for (int i = 0; i < bitmaps.length; i ++) {
bitmaps[i] = Bitmaps.decode(buf);
}
return bitmaps;
}
}
}
9 changes: 9 additions & 0 deletions common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down Expand Up @@ -93,6 +97,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
Expand Down Expand Up @@ -135,4 +136,24 @@ public void onFailure(Throwable t) {
hostLocalDirsCompletable.completeExceptionally(e);
}
}

/**
* Push a sequence of shuffle blocks in a best-effort manner to a remote node asynchronously.
* These shuffle blocks, along with blocks pushed by other clients, will be merged into
* per-shuffle partition merged shuffle files on the destination node.
*
* @param host the host of the remote node.
* @param port the port of the remote node.
* @param blockIds block ids to be pushed
* @param buffers buffers to be pushed
* @param listener the listener to receive block push status.
*/
public void pushBlocks(
String host,
int port,
String[] blockIds,
ManagedBuffer[] buffers,
BlockFetchingListener listener) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.net.ConnectException;

import com.google.common.base.Throwables;

/**
* Plugs into {@link RetryingBlockFetcher} to further control when an exception should be retried
* and logged.
* Note: {@link RetryingBlockFetcher} will delegate the exception to this handler only when
* - remaining retries < max retries
* - exception is an IOException
*/

public interface ErrorHandler {

boolean shouldRetryError(Throwable t);

default boolean shouldLogError(Throwable t) {
return true;
}

/**
* A no-op error handler instance.
*/
ErrorHandler NOOP_ERROR_HANDLER = t -> true;

/**
* The error handler for pushing shuffle blocks to remote shuffle services.
*/
class BlockPushErrorHandler implements ErrorHandler {
/**
* String constant used for generating exception messages indicating a block to be merged
* arrives too late on the server side, and also for later checking such exceptions on the
* client side. When we get a block push failure because of the block arrives too late, we
* will not retry pushing the block nor log the exception on the client side.
*/
public static final String TOO_LATE_MESSAGE_SUFFIX =
"received after merged shuffle is finalized";

/**
* String constant used for generating exception messages indicating the server couldn't
* append a block after all available attempts due to collision with other blocks belonging
* to the same shuffle partition, and also for later checking such exceptions on the client
* side. When we get a block push failure because of the block couldn't be written due to
* this reason, we will not log the exception on the client side.
*/
public static final String COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX =
"Couldn't find an opportunity to write block";

@Override
public boolean shouldRetryError(Throwable t) {
// If it is a connection time out or a connection closed exception, no need to retry.
if (t.getCause() != null && t.getCause() instanceof ConnectException) {
return false;
}
// If the block is too late, there is no need to retry it
return !Throwables.getStackTraceAsString(t).contains(TOO_LATE_MESSAGE_SUFFIX);
}

@Override
public boolean shouldLogError(Throwable t) {
String errorStackTrace = Throwables.getStackTraceAsString(t);
return !errorStackTrace.contains(COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX) &&
!errorStackTrace.contains(TOO_LATE_MESSAGE_SUFFIX);
}
}
}
Loading