Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26590][CORE] make fetch-block-to-disk backward compatible #23510

Closed
wants to merge 6 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Jan 10, 2019

What changes were proposed in this pull request?

This is a followup of #16989

The fetch-block-to-disk feature is disabled by default, because it's not compatible with external shuffle service prior to Spark 2.2. The client sends stream request to fetch block chunks, and old shuffle service can't support it.

This PR proposes a new approach:

  1. extend ChunkFetchRequest to add an optional fetchAsStream boolean flag. It will only be encoded to the message when it's true. ChunkFetchRequest from old clients do not have this flag, which means this flag is false for them.
  2. server side takes care of the new flag in ChunkFetchRequest. If the flag is true, return a new chunk stream response, otherwise return a normal chunk fetch response.
  3. when client side sends ChunkFetchRequest with fetchAsStream=true, it will set up two callbacks for the new chunk stream response and the normal chunk fetch response. This is necessary because the server side may be an old version which ignores the fetchAsStream flag.

This is fully compatible:

  1. new client <-> new server: Definitely fine
  2. old client <-> new server: The ChunkFetchRequest message doesn't have the fetchAsStream flag, the server treats it as a normal fetch request, and returns normal fetch request response.
  3. new client <-> old server: The ChunkFetchRequest message contains the fetchAsStream flag, but the old server doesn't know about it and stops reading the message right before the fetchAsStream part. Then the old server returns normal chunk fetch response, and new client accept it.

Note that, the previous server side changes made in #16989 are still there, so clients of Spark 2.2, 2.3, 2.4 with fetch-block-to-disk enabled still work.

TODO: setup different versions of shuffle service and test it.

How was this patch tested?

existing tests.

@SparkQA
Copy link

SparkQA commented Jan 10, 2019

Test build #101027 has finished for PR 23510 at commit c543c7f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 10, 2019

Test build #101028 has finished for PR 23510 at commit 348580f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class ChunkFetchStreamResponse extends AbstractResponseMessage

@SparkQA
Copy link

SparkQA commented Jan 11, 2019

Test build #101052 has finished for PR 23510 at commit ae2b66a.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class ChunkFetchStreamResponse extends AbstractResponseMessage

@dongjoon-hyun
Copy link
Member

Hi, @cloud-fan .

According to the PR description and contents, I'm wondering if we can claim that Spark guarantee to be compatible among all existing Spark versions, at least 1.6.3, 2.0.x, 2.1.x, additionally?

cc @mridulm

@dongjoon-hyun
Copy link
Member

If you don't mind, could you drop [SQL] in the PR title and JIRA issue? This PR seems not to touch SQL module.

$ git diff HEAD~1 --stat
 common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java            | 15 ++++++++--
 common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java   | 92 +++++++++++++++++++++++++++++++++++++++------------------
 common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java        | 27 +++++++++++++----
 common/network-common/src/main/java/org/apache/spark/network/protocol/ChunkFetchStreamResponse.java | 93 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 common/network-common/src/main/java/org/apache/spark/network/protocol/Message.java                  |  3 +-
 common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java           |  3 ++
 common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java   | 13 ++++----
 common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java     |  6 ++--
 common/network-common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java        |  2 +-
 common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java    |  8 ++---
 core/src/main/scala/org/apache/spark/internal/config/package.scala                                  |  4 +--
 11 files changed, 212 insertions(+), 54 deletions(-)

@cloud-fan cloud-fan changed the title [SPARK-26590][SQL][CORE] make fetch-block-to-disk backward compatible [SPARK-26590][CORE] make fetch-block-to-disk backward compatible Jan 11, 2019
@cloud-fan
Copy link
Contributor Author

I'm wondering if we can claim that Spark guarantee to be compatible among all existing Spark versions

Yes it is, at least for all the releases that are still under maintenance.

try {
entry.getValue().onFailure(entry.getKey().toString(), cause);
} catch (Exception e) {
logger.warn("ChunkReceivedCallback.onFailure throws exception", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be StreamCallback.onFailure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchChunk adds both ChunkReceivedCallback and StreamCallback for a chunk request. Will we trigger both two callback's onFailure methods here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 callbacks are individual, we only need to take care one of them, and discard the other.

@@ -128,11 +128,15 @@ public void setClientId(String id) {
* be agreed upon by client and server beforehand.
* @param chunkIndex 0-based index of the chunk to fetch
* @param callback Callback invoked upon successful receipt of chunk, or upon any failure.
* @param streamCallback If it's not null, we will send a `ChunkFetchRequest` with
* `fetchAsStream=true`, and this callback will be used to handle the stream
* response.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update this code comment of fetchChunk. Now it can request stream instead of just a single chunk.

@@ -128,11 +128,15 @@ public void setClientId(String id) {
* be agreed upon by client and server beforehand.
* @param chunkIndex 0-based index of the chunk to fetch
* @param callback Callback invoked upon successful receipt of chunk, or upon any failure.
* @param streamCallback If it's not null, we will send a `ChunkFetchRequest` with
* `fetchAsStream=true`, and this callback will be used to handle the stream
* response.
*/
public void fetchChunk(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not big deal but maybe rename to fetchChunkOrStream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still fetch a chunk, but the chunk may be returned as a stream.

*/
public void fetchChunk(
long streamId,
int chunkIndex,
ChunkReceivedCallback callback) {
ChunkReceivedCallback callback,
StreamCallback streamCallback) {
if (logger.isDebugEnabled()) {
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have something in the log to show this is also a stream request in case of streamCallback != null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how you interprete it. We can say that this is a special chunk fetch request, the server side can return a stream reponse for it.

@SparkQA
Copy link

SparkQA commented Jan 11, 2019

Test build #101065 has finished for PR 23510 at commit 3e17be6.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class ChunkFetchStreamResponse extends AbstractResponseMessage

@viirya
Copy link
Member

viirya commented Jan 11, 2019

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 11, 2019

Test build #101074 has finished for PR 23510 at commit 3e17be6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class ChunkFetchStreamResponse extends AbstractResponseMessage

* sender. The receiver is expected to set a temporary channel handler that will consume the
* number of bytes this message says the stream has.
*/
public final class ChunkFetchStreamResponse extends AbstractResponseMessage {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very similar to StreamResponse, except that here we use StreamChunkId streamChunkId instead of String streamId.

@SparkQA
Copy link

SparkQA commented Jan 11, 2019

Test build #101086 has finished for PR 23510 at commit 9c261c5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 11, 2019

Test build #101084 has finished for PR 23510 at commit 4a12894.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 11, 2019

Test build #101085 has finished for PR 23510 at commit 5b51df4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if we have this, do we still need StreamRequest/StreamResponse ?

outstandingFetchAsStreams.remove(resp.streamChunkId);
} else if (message instanceof ChunkFetchStreamResponse) {
ChunkFetchStreamResponse resp = (ChunkFetchStreamResponse) message;
StreamCallback callback = outstandingFetchAsStreams.get(resp.streamChunkId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also remove this callback from outstandingFetchAsStreams ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

import org.apache.spark.network.buffer.ManagedBuffer;

/**
* Response to {@link StreamRequest} when the stream has been successfully opened.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamRequest or ChunkFetchRequest ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah forgot to update the comment :P

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Jan 14, 2019

@SparkQA
Copy link

SparkQA commented Jan 14, 2019

Test build #101155 has finished for PR 23510 at commit 90f46ce.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2019

Test build #101166 has finished for PR 23510 at commit 62a7879.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

public void addFetchAsStreamRequest(
StreamChunkId streamChunkId,
StreamCallback callback) {
updateTimeOfLastRequest();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark uses 4 space indentation for multi-line method parameters

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean updateTimeOfLastRequest();, it is not parameters and should be 2 space indentation ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah missed this one!

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 14, 2019

Test build #101178 has finished for PR 23510 at commit 62a7879.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2019

Test build #101181 has finished for PR 23510 at commit 6e1a05b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Jan 15, 2019

@attilapiros

@tgravescs
Copy link
Contributor

According to our docs we no longer maintain anything < 2.2 so why are we adding this to support?
http://spark.apache.org/versioning-policy.html

@squito
Copy link
Contributor

squito commented Jan 15, 2019

Thanks for doing this @cloud-fan -- I have one high level question before looking more carefully at the code. When you say:

new client <-> old server: The ChunkFetchRequest message contains the fetchAsStream flag, but the old server doesn't know about it and stops reading the message right before the fetchAsStream part. Then the old server returns normal chunk fetch response, and new client accept it.

so the client accepts it, but will the client stream to disk still, or will it fallback to still fetching to memory? It seems it should be possible to stream to disk, as the server is really sending virtually the same bytes either way (just a different header, more or less). That said, I think the actually plumbing is rather complicated to update (this is something @attilapiros looked at earlier).

I can see about testing this with old versions of the shuffle server -- and especially if you think this makes stream-to-disk work even with the old shuffle server, than we can also test out shuffling large blocks too.

@cloud-fan
Copy link
Contributor Author

According to our docs we no longer maintain anything < 2.2 so why are we adding this to support?

Yes you are right, we can just simply turn on this feature in Spark 3.0. But in practice, it's always better if we can be more backward compatible without overhead, even for versions that we don't maintain anymore, as users may still use it.

Another reason is, the previous approach is really hacky. It makes OneForOneStreamManager and NettyStreamManager have totally different semantic about openStream, which makes a lot of assumption about the shuffle implementation, and is pretty fragile and confusing. This PR allows us to remove OneForOneStreamManager.openStream in the future.

@cloud-fan
Copy link
Contributor Author

so the client accepts it, but will the client stream to disk still, or will it fallback to still fetching to memory?

When the old server returns a normal chunk fetch response, the new client will process it just like the client has sent a normal chunk fetch request, and put the data in memory.

It seems it should be possible to stream to disk, as the server is really sending virtually the same bytes either way (just a different header, more or less)

AFAIK the streaming response is very different from chunk fetch response. The chunk fetch response will send the data in one message, so the client already puts the data in memory when it receives the message. The stream response is a notice of the following small messages, and the real data is sent via many small messages, so that client has a chance to put it in disk incrementally.

@tgravescs
Copy link
Contributor

What do you mean by "we can just simply turn on this feature in Spark 3.0"? I don't see where this is a feature that only applies to 3.0, at least according to description. I agree it's better to be backwards compatible in general but people should be upgrading as there are other possible issues in older versions - correctness issue, security issues, etc.
I realize some people don't or can't upgrade quickly though as well.

If it's a general improvement then sounds ok, especially if it gives us more flexibility in the future. Do you have other specific changes in mind that will use this, like the work going on with off node shuffle service? Perhaps we should change the description to emphasize that rather then backwards compatibility with versions we no longer maintain

@cloud-fan
Copy link
Contributor Author

What do you mean by "we can just simply turn on this feature in Spark 3.0"?

The fetch-block-to-disk feature was introduced at Spark 2.2 and is disabled by default because it's incompatible with shuffle service prior to Spark 2.2. If we don't need to maintain Spark prior to 2.2 anymore, maybe we can simply turn it on by default now, even without this PR.

This is a general improvement about backward compatibility and helps us to get rid of the hacky code in the future. I don't have other specific goals in mind.

@attilapiros
Copy link
Contributor

Yes, I tried to solve the same issue by adding ChunkFetchSuccess an extra attribute remainingFrameSize to store the size which are not yet read for the frame (as it could be will be streamed to disk). If the incoming ChunkFetchSuccess body size was over a spark.maxRemoteBlockSizeFetchToMem then I hijacked reading the whole body in TransportFrameDecoder and filled this size (my TransportFrameDecoder is even not produced a simple ByteBuf instances but a half parsed message which contained the message type and the size of the body or this size (called ParsedFrame) and specific messages were created from the ParsedFrames).

Anyway the source is available here https://github.com/attilapiros/spark/pull/1/files#diff-fa724c37d2f4d18795dabb9124a71213 (but I doubt whether it is useful for you right now).

@tgravescs
Copy link
Contributor

Yeah I would be in favor of turning the config on by default in 3.0, plus like mentioned we don't maintain < 2.2 anymore.

@squito
Copy link
Contributor

squito commented Jan 17, 2019

I kinda agree with Tom after thinking about this a bit more, that maybe its not worth adding . The worry I have about trying to make it backwards compatible is that we might not be testing it regularly and it gets inadvertently broken later on. I guess I'm fine either way.

AFAIK the streaming response is very different from chunk fetch response.

not really that different -- there is a small header (which is different in each case), followed by the bulk of the response which is the actual data of the shuffle block (the same in both cases). Now, the client does very different things with that response based on the first header -- the data is always a stream as some level, but the client may decide to buffer it all into memory or not.

@cloud-fan
Copy link
Contributor Author

makes sense, I'm closing it

@cloud-fan cloud-fan closed this Jan 23, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants