Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks #29855

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f144936
LIHADOOP-48527 Magnet shuffle service block transfer netty protocol
Victsm May 9, 2020
3bb084a
LIHADOOP-53388 Magnet: Fix a bug with calculating bitmap encoded length
zhouyejoe May 11, 2020
0781541
LIHADOOP-53438 Using different appId for the tests in RemoteBlockPush…
otterc May 12, 2020
7adf227
LIHADOOP-53496 Not logging all block push exceptions on the client
otterc May 15, 2020
ee75ee9
LIHADOOP-53700 Separate configuration for caching the merged index fi…
zhouyejoe Jun 1, 2020
d1f36c0
LIHADOOP-53940 Logging the data file and index file path when shuffle…
otterc Jun 10, 2020
aa124b4
LIHADOOP-54059 LIHADOOP-53496 Handle the inconsistencies between loc…
otterc Jun 15, 2020
3f1fb0c
LIHADOOP-54379 Sorting the disks both on shuffle service and executors
otterc Jun 24, 2020
bfcb070
LIHADOOP-54370 Not to retry on certain exceptions when pushing blocks
otterc Jun 24, 2020
415a7d5
LIHADOOP-52494 Magnet fallback to origin shuffle blocks when fetch of…
otterc Jul 24, 2020
4381ff3
LIHADOOP-55372 reduced the default for minChunkSizeInMergedShuffleFile
otterc Aug 26, 2020
dd9958b
LIHADOOP-55315 Avoid network when fetching merged shuffle file in loc…
zhouyejoe Sep 9, 2020
021dea4
LIHADOOP-55654 Duplicate application init calls trigger NPE and wrong…
zhouyejoe Sep 12, 2020
5ce02d3
Prune changes that should go into a later PR.
Victsm Sep 23, 2020
90d6329
Further prune changes that should go into a later PR.
Victsm Sep 23, 2020
2bdf800
Fix review comments.
Victsm Sep 23, 2020
3e9e9e1
Fix unit test failure.
Victsm Sep 23, 2020
eb366d4
Address review comments.
Victsm Sep 25, 2020
7a6ab15
Add more comment.
Victsm Sep 25, 2020
85b0de8
Fix style issue.
Victsm Sep 25, 2020
7b48c50
Add more clarifying comments.
Victsm Sep 29, 2020
9f00cc3
Further optimize the serde of bitmap
Victsm Oct 1, 2020
db36f3f
Update common/network-shuffle/src/test/java/org/apache/spark/network/…
Victsm Oct 2, 2020
e604686
Address additional review comments
Victsm Oct 10, 2020
b45b190
Additional cleanups.
Victsm Oct 10, 2020
f11eb9b
Fix additional review comments.
Victsm Oct 11, 2020
691635e
Fix additional review comments
Victsm Oct 12, 2020
f016b39
Fix additional review comments.
Victsm Oct 12, 2020
2c95f18
Fix styling issue
Victsm Oct 13, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@

package org.apache.spark.network.protocol;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import io.netty.buffer.ByteBuf;
import org.roaringbitmap.RoaringBitmap;

/** Provides a canonical set of Encoders for simple types. */
public class Encoders {
Expand All @@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
}
}

/** Bitmaps are encoded with their serialization length followed by the serialization bytes. */
public static class Bitmaps {
public static int encodedLength(RoaringBitmap b) {
// Compress the bitmap before serializing it
b.trim();
b.runOptimize();
return 4 + b.serializedSizeInBytes();
}

public static void encode(ByteBuf buf, RoaringBitmap b) {
ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
try {
b.serialize(new DataOutputStream(new OutputStream() {
ByteBuffer buffer;

OutputStream init(ByteBuffer buffer) {
this.buffer = buffer;
return this;
}

@Override
public void close() {
}

@Override
public void flush() {
}

@Override
public void write(int b) {
buffer.put((byte) b);
}

@Override
public void write(byte[] b) {
buffer.put(b);
}

@Override
public void write(byte[] b, int off, int l) {
buffer.put(b, off, l);
}
}.init(outBuffer)));
} catch (IOException e) {
throw new RuntimeException("Exception while encoding bitmap", e);
}
byte[] bytes = outBuffer.array();
buf.writeInt(bytes.length);
buf.writeBytes(bytes);
}

public static RoaringBitmap decode(ByteBuf buf) {
int length = buf.readInt();
byte[] bytes = new byte[length];
buf.readBytes(bytes);
RoaringBitmap bitmap = new RoaringBitmap();
try {
bitmap.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
} catch (IOException e) {
throw new RuntimeException("Exception while decoding bitmap", e);
}
return bitmap;
}
}

/** Byte arrays are encoded with their length followed by bytes. */
public static class ByteArrays {
public static int encodedLength(byte[] arr) {
Expand Down Expand Up @@ -135,4 +207,31 @@ public static long[] decode(ByteBuf buf) {
return longs;
}
}

/** Bitmap arrays are encoded with the number of bitmaps followed by per-Bitmap encoding. */
public static class BitmapArrays {
public static int encodedLength(RoaringBitmap[] bitmaps) {
int totalLength = 4;
for (RoaringBitmap b : bitmaps) {
totalLength += Bitmaps.encodedLength(b);
}
return totalLength;
}

public static void encode(ByteBuf buf, RoaringBitmap[] bitmaps) {
buf.writeInt(bitmaps.length);
for (RoaringBitmap b : bitmaps) {
Bitmaps.encode(buf, b);
}
}

public static RoaringBitmap[] decode(ByteBuf buf) {
int numBitmaps = buf.readInt();
RoaringBitmap[] bitmaps = new RoaringBitmap[numBitmaps];
for (int i = 0; i < bitmaps.length; i ++) {
bitmaps[i] = Bitmaps.decode(buf);
}
return bitmaps;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.*;
import org.apache.spark.network.protocol.*;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportFrameDecoder;

import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
Expand Down Expand Up @@ -181,6 +182,17 @@ public void onFailure(Throwable e) {
private void processStreamUpload(final UploadStream req) {
assert (req.body() == null);
try {
// Retain the original metadata buffer, since it will be used during the invocation of
// this method. Will be released later.
req.meta.retain();
// Make a copy of the original metadata buffer. In benchmark, we noticed that
// we cannot respond the original metadata buffer back to the client, otherwise
// in cases where multiple concurrent shuffles are present, a wrong metadata might
// be sent back to client. This is related to the eager release of the metadata buffer,
// i.e., we always release the original buffer by the time the invocation of this
// method ends, instead of by the time we respond it to the client. This is necessary,
// otherwise we start seeing memory issues very quickly in benchmarks.
ByteBuffer meta = cloneBuffer(req.meta.nioByteBuffer());
RpcResponseCallback callback = new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
Expand All @@ -189,13 +201,17 @@ public void onSuccess(ByteBuffer response) {

@Override
public void onFailure(Throwable e) {
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
// Piggyback request metadata as part of the exception error String, so we can
// respond the metadata upon a failure without changing the existing protocol.
respond(new RpcFailure(req.requestId,
JavaUtils.encodeHeaderIntoErrorString(meta.duplicate(), e)));
req.meta.release();
}
};
TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
ByteBuffer meta = req.meta.nioByteBuffer();
StreamCallbackWithID streamHandler = rpcHandler.receiveStream(reverseClient, meta, callback);
StreamCallbackWithID streamHandler =
rpcHandler.receiveStream(reverseClient, meta.duplicate(), callback);
if (streamHandler == null) {
throw new NullPointerException("rpcHandler returned a null streamHandler");
}
Expand All @@ -209,12 +225,17 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
public void onComplete(String streamId) throws IOException {
try {
streamHandler.onComplete(streamId);
callback.onSuccess(ByteBuffer.allocate(0));
callback.onSuccess(meta.duplicate());
} catch (Exception ex) {
IOException ioExc = new IOException("Failure post-processing complete stream;" +
" failing this rpc and leaving channel active", ex);
// req.meta will be released once inside callback.onFailure. Retain it one more
// time to be released in the finally block.
req.meta.retain();
callback.onFailure(ioExc);
streamHandler.onFailure(streamId, ioExc);
} finally {
req.meta.release();
}
}

Expand All @@ -238,12 +259,26 @@ public String getID() {
}
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
try {
// It's OK to respond the original metadata buffer here, because this is still inside
// the invocation of this method.
respond(new RpcFailure(req.requestId,
JavaUtils.encodeHeaderIntoErrorString(req.meta.nioByteBuffer(), e)));
} catch (IOException ioe) {
// No exception will be thrown here. req.meta.nioByteBuffer will not throw IOException
// because it's a NettyManagedBuffer. This try-catch block is to make compiler happy.
logger.error("Error in handling failure while invoking RpcHandler#receive() on RPC id "
+ req.requestId, e);
} finally {
req.meta.release();
}
// We choose to totally fail the channel, rather than trying to recover as we do in other
// cases. We don't know how many bytes of the stream the client has already sent for the
// stream, it's not worth trying to recover.
channel.pipeline().fireExceptionCaught(e);
} finally {
// Make sure we always release the original metadata buffer by the time we exit the
// invocation of this method. Otherwise, we see memory issues fairly quickly in benchmarks.
req.meta.release();
}
}
Expand All @@ -258,6 +293,16 @@ private void processOneWayMessage(OneWayMessage req) {
}
}

/**
* Make a full copy of a nio ByteBuffer.
*/
private ByteBuffer cloneBuffer(ByteBuffer buf) {
ByteBuffer clone = ByteBuffer.allocate(buf.capacity());
clone.put(buf.duplicate());
clone.flip();
return clone;
}

/**
* Responds to a single message with some Encodable object. If a failure occurs while sending,
* it will be logged and the channel closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.regex.Pattern;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.SystemUtils;
Expand Down Expand Up @@ -80,6 +81,17 @@ public static String bytesToString(ByteBuffer b) {
return Unpooled.wrappedBuffer(b).toString(StandardCharsets.UTF_8);
}

/**
* Encode the header ByteBuffer into the error string to be returned via RpcFailure.
* Use ISO_8859_1 encoding instead of UTF_8. UTF_8 will change the byte content
* for bytes larger than 127. This would render incorrect result when encoding
* decoding the index inside the PushBlockStream message.
*/
public static String encodeHeaderIntoErrorString(ByteBuffer headerBuffer, Throwable e) {
String encodedHeader = StandardCharsets.ISO_8859_1.decode(headerBuffer).toString();
return encodedHeader + Throwables.getStackTraceAsString(e);
}

/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -168,6 +169,7 @@ public static void tearDown() {
static class RpcResult {
public Set<String> successMessages;
public Set<String> errorMessages;
public Set<String> successResponses;
}

private RpcResult sendRPC(String ... commands) throws Exception {
Expand Down Expand Up @@ -210,6 +212,7 @@ private RpcResult sendRpcWithStream(String... streams) throws Exception {
RpcResult res = new RpcResult();
res.successMessages = Collections.synchronizedSet(new HashSet<>());
res.errorMessages = Collections.synchronizedSet(new HashSet<>());
res.successResponses = Collections.synchronizedSet(new HashSet<>());

for (String stream : streams) {
int idx = stream.lastIndexOf('/');
Expand Down Expand Up @@ -247,6 +250,7 @@ private static class RpcStreamCallback implements RpcResponseCallback {
@Override
public void onSuccess(ByteBuffer message) {
res.successMessages.add(streamId);
res.successResponses.add(JavaUtils.bytesToString(message));
sem.release();
}

Expand Down Expand Up @@ -326,6 +330,7 @@ public void sendRpcWithStreamOneAtATime() throws Exception {
RpcResult res = sendRpcWithStream(stream);
assertTrue("there were error messages!" + res.errorMessages, res.errorMessages.isEmpty());
assertEquals(Sets.newHashSet(stream), res.successMessages);
assertEquals(Sets.newHashSet(stream), res.successResponses);
}
}

Expand All @@ -336,30 +341,54 @@ public void sendRpcWithStreamConcurrently() throws Exception {
streams[i] = StreamTestHelper.STREAMS[i % StreamTestHelper.STREAMS.length];
}
RpcResult res = sendRpcWithStream(streams);
assertEquals(Sets.newHashSet(StreamTestHelper.STREAMS), res.successMessages);
Set<String> streamSet = Sets.newHashSet(StreamTestHelper.STREAMS);
assertEquals(streamSet, res.successMessages);
assertEquals(streamSet, res.successResponses);
assertTrue(res.errorMessages.isEmpty());
}

@Test
public void sendRpcWithStreamFailures() throws Exception {
// when there is a failure reading stream data, we don't try to keep the channel usable,
// just send back a decent error msg.
String failStream = "fail/exception-ondata/smallBuffer";
RpcResult exceptionInCallbackResult =
sendRpcWithStream("fail/exception-ondata/smallBuffer", "smallBuffer");
sendRpcWithStream(failStream, "smallBuffer");
assertErrorAndClosed(exceptionInCallbackResult, "Destination failed while reading stream");
assertDecodedErrorsContain(exceptionInCallbackResult.errorMessages, failStream);

failStream = "fail/null/smallBuffer";
RpcResult nullStreamHandler =
sendRpcWithStream("fail/null/smallBuffer", "smallBuffer");
sendRpcWithStream(failStream, "smallBuffer");
assertErrorAndClosed(exceptionInCallbackResult, "Destination failed while reading stream");
assertDecodedErrorsContain(nullStreamHandler.errorMessages, failStream);

// OTOH, if there is a failure during onComplete, the channel should still be fine
failStream = "fail/exception-oncomplete/smallBuffer";
RpcResult exceptionInOnComplete =
sendRpcWithStream("fail/exception-oncomplete/smallBuffer", "smallBuffer");
sendRpcWithStream(failStream, "smallBuffer");
assertErrorsContain(exceptionInOnComplete.errorMessages,
Sets.newHashSet("Failure post-processing"));
assertDecodedErrorsContain(exceptionInOnComplete.errorMessages, failStream);
assertEquals(Sets.newHashSet("smallBuffer"), exceptionInOnComplete.successMessages);
}

private void assertDecodedErrorsContain(Set<String> errors, String contain) {
Set<String> decodedErrors = Sets.newHashSet();
for (String error : errors) {
ByteBuffer rawBuffer = ByteBuffer.wrap(error.getBytes(StandardCharsets.ISO_8859_1));
decodedErrors.add(JavaUtils.bytesToString(rawBuffer));
}
boolean foundMatch = false;
for (String error : decodedErrors) {
if (error.contains(contain)) {
foundMatch = true;
break;
}
}
assertTrue("Could not find decoded error containing " + contain, foundMatch);
}

private void assertErrorsContain(Set<String> errors, Set<String> contains) {
assertEquals("Expected " + contains.size() + " errors, got " + errors.size() + "errors: " +
errors, contains.size(), errors.size());
Expand Down
9 changes: 9 additions & 0 deletions common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down Expand Up @@ -93,6 +97,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Loading