-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26604][CORE] Clean up channel registration for StreamManager #23521
Conversation
cc @cloud-fan |
Test build #101089 has finished for PR 23521 at commit
|
@@ -127,6 +127,7 @@ private void processStreamRequest(final StreamRequest req) { | |||
ManagedBuffer buf; | |||
try { | |||
buf = streamManager.openStream(req.streamId); | |||
streamManager.registerChannel(channel, req.streamId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering when we should do this. There are many kinds of requests, and currently only chunk fetch request does it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For stream request and get chunk request, they are recorded in stream manager. Registering channel for them is to make sure they are removed from stream manager when the channel is inactive. For other types of requests, I don't find they are recorded like that.
Test build #101090 has finished for PR 23521 at commit
|
* of the stream. This is similar to {@link #registerChannel(Channel, long)} method, but the | ||
* <code>streamId</code> argument is for the stream in response to a stream() request. | ||
*/ | ||
public void registerChannel(Channel channel, String streamId) { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now things get tricky here. There are 2 different kind of stream requests:
- to download jar and other files
- to fetch data blocks (introduced at [SPARK-19659] Fetch big blocks to disk when shuffle-read. #16989)
Which one do we need to register channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- For stream requests from fetching data blocks, the streams will be registered by RPC handler. So registering the channels helps to remove registered streams when the channels get inactive.
- For stream requests from downloading jar and files, there is no such stream registration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you saying TransportRequestHandler.processStreamRequest
is only used to deal with stream request to fetch blocks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TransportRequestHandler.processStreamRequest
is used for both. But the streams are not registered there. It is registered by NettyBlockRpcServer
when processing OpenBlocks
message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
registerStream
is only in OneForOneStreamManager
, it's super weird that registerChannel
needs to be called after registerStream
, but registerChannel
is in the parent StreamManager
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I agree. I think it is also good to clean this up a bit. I will do a change later.
@cloud-fan Since |
Test build #101187 has finished for PR 23521 at commit
|
Test build #101188 has started for PR 23521 at commit |
test this please |
1 similar comment
test this please |
Test build #101198 has finished for PR 23521 at commit
|
* Associates a stream with a single client connection, which is guaranteed to be the only reader | ||
* of the stream. Once the connection is closed, the stream will never be used again, enabling | ||
* cleanup by `connectionTerminated`. | ||
*/ | ||
public void registerChannel(Channel channel, long streamId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be private?
@@ -42,9 +42,10 @@ | |||
* The returned ManagedBuffer will be release()'d after being written to the network. | |||
* | |||
* @param streamId id of a stream that has been previously registered with the StreamManager. | |||
* @param channel The connection used to serve chunk request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's say more about this parameter, especially how it should be used. IIUC we need to track the channel states, and do some cleanup if the channel is inactive.
Test build #101211 has finished for PR 23521 at commit
|
@cloud-fan Moved channel registration to where we register the stream. Few tests are modified. |
Test build #101243 has finished for PR 23521 at commit
|
Test build #101244 has finished for PR 23521 at commit
|
Oops... |
this.appId = appId; | ||
this.buffers = Preconditions.checkNotNull(buffers); | ||
this.associatedChannel = channel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
associatedChannel
can be final now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
return myStreamId; | ||
} | ||
|
||
@VisibleForTesting | ||
public int streamStateSize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: numStreamStates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
Test build #101245 has finished for PR 23521 at commit
|
Test build #101251 has finished for PR 23521 at commit
|
retest this please. |
Test build #101269 has finished for PR 23521 at commit
|
thanks, merging to master! |
## What changes were proposed in this pull request? Now in `TransportRequestHandler.processStreamRequest`, when a stream request is processed, the stream id is not registered with the current channel in stream manager. It should do that so in case of that the channel gets terminated we can remove associated streams of stream requests too. This also cleans up channel registration in `StreamManager`. Since `StreamManager` doesn't register channel but only `OneForOneStreamManager` does it, this removes `registerChannel` from `StreamManager`. When `OneForOneStreamManager` goes to register stream, it will also register channel for the stream. ## How was this patch tested? Existing tests. Closes apache#23521 from viirya/SPARK-26604. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
I think we need to backport this into branch-2.4 |
If it can't be directly merged to branch-2.4, I can make a PR for it. |
I am taking a look, but it may not be until later today that I'd have a PR up. It's mostly clean but the 2.4 branch doesn't have the |
Now in `TransportRequestHandler.processStreamRequest`, when a stream request is processed, the stream id is not registered with the current channel in stream manager. It should do that so in case of that the channel gets terminated we can remove associated streams of stream requests too. This also cleans up channel registration in `StreamManager`. Since `StreamManager` doesn't register channel but only `OneForOneStreamManager` does it, this removes `registerChannel` from `StreamManager`. When `OneForOneStreamManager` goes to register stream, it will also register channel for the stream. Existing tests. Closes apache#23521 from viirya/SPARK-26604. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…treamManager ## What changes were proposed in this pull request? This is mostly a clean backport of #23521 to branch-2.4 ## How was this patch tested? I've tested this with a hack in `TransportRequestHandler` to force `ChunkFetchRequest` to get dropped. Then making a number of `ExternalShuffleClient.fetchChunk` requests (which `OpenBlocks` then `ChunkFetchRequest`) and closing out of my test harness. A heap dump later reveals that the `StreamState` references are unreachable. I haven't run this through the unit test suite, but doing that now. Wanted to get this up as I think folks are waiting for it for 2.4.1 Closes #24013 from abellina/SPARK-26604_cherry_pick_2_4. Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Co-authored-by: Alessandro Bellina <abellina@yahoo-inc.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…treamManager ## What changes were proposed in this pull request? This is mostly a clean backport of #23521 to branch-2.4 ## How was this patch tested? I've tested this with a hack in `TransportRequestHandler` to force `ChunkFetchRequest` to get dropped. Then making a number of `ExternalShuffleClient.fetchChunk` requests (which `OpenBlocks` then `ChunkFetchRequest`) and closing out of my test harness. A heap dump later reveals that the `StreamState` references are unreachable. I haven't run this through the unit test suite, but doing that now. Wanted to get this up as I think folks are waiting for it for 2.4.1 Closes #24013 from abellina/SPARK-26604_cherry_pick_2_4. Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Co-authored-by: Alessandro Bellina <abellina@yahoo-inc.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit 216eeec) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…treamManager ## What changes were proposed in this pull request? This is mostly a clean backport of apache#23521 to branch-2.4 ## How was this patch tested? I've tested this with a hack in `TransportRequestHandler` to force `ChunkFetchRequest` to get dropped. Then making a number of `ExternalShuffleClient.fetchChunk` requests (which `OpenBlocks` then `ChunkFetchRequest`) and closing out of my test harness. A heap dump later reveals that the `StreamState` references are unreachable. I haven't run this through the unit test suite, but doing that now. Wanted to get this up as I think folks are waiting for it for 2.4.1 Closes apache#24013 from abellina/SPARK-26604_cherry_pick_2_4. Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Co-authored-by: Alessandro Bellina <abellina@yahoo-inc.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…treamManager ## What changes were proposed in this pull request? This is mostly a clean backport of apache#23521 to branch-2.4 ## How was this patch tested? I've tested this with a hack in `TransportRequestHandler` to force `ChunkFetchRequest` to get dropped. Then making a number of `ExternalShuffleClient.fetchChunk` requests (which `OpenBlocks` then `ChunkFetchRequest`) and closing out of my test harness. A heap dump later reveals that the `StreamState` references are unreachable. I haven't run this through the unit test suite, but doing that now. Wanted to get this up as I think folks are waiting for it for 2.4.1 Closes apache#24013 from abellina/SPARK-26604_cherry_pick_2_4. Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Co-authored-by: Alessandro Bellina <abellina@yahoo-inc.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…treamManager ## What changes were proposed in this pull request? This is mostly a clean backport of apache#23521 to branch-2.4 ## How was this patch tested? I've tested this with a hack in `TransportRequestHandler` to force `ChunkFetchRequest` to get dropped. Then making a number of `ExternalShuffleClient.fetchChunk` requests (which `OpenBlocks` then `ChunkFetchRequest`) and closing out of my test harness. A heap dump later reveals that the `StreamState` references are unreachable. I haven't run this through the unit test suite, but doing that now. Wanted to get this up as I think folks are waiting for it for 2.4.1 Closes apache#24013 from abellina/SPARK-26604_cherry_pick_2_4. Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Co-authored-by: Alessandro Bellina <abellina@yahoo-inc.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…treamManager ## What changes were proposed in this pull request? This is mostly a clean backport of apache/spark#23521 to branch-2.4 ## How was this patch tested? I've tested this with a hack in `TransportRequestHandler` to force `ChunkFetchRequest` to get dropped. Then making a number of `ExternalShuffleClient.fetchChunk` requests (which `OpenBlocks` then `ChunkFetchRequest`) and closing out of my test harness. A heap dump later reveals that the `StreamState` references are unreachable. I haven't run this through the unit test suite, but doing that now. Wanted to get this up as I think folks are waiting for it for 2.4.1 Closes #24013 from abellina/SPARK-26604_cherry_pick_2_4. Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Co-authored-by: Alessandro Bellina <abellina@yahoo-inc.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit 216eeec)
## What changes were proposed in this pull request? Now in `TransportRequestHandler.processStreamRequest`, when a stream request is processed, the stream id is not registered with the current channel in stream manager. It should do that so in case of that the channel gets terminated we can remove associated streams of stream requests too. This also cleans up channel registration in `StreamManager`. Since `StreamManager` doesn't register channel but only `OneForOneStreamManager` does it, this removes `registerChannel` from `StreamManager`. When `OneForOneStreamManager` goes to register stream, it will also register channel for the stream. ## How was this patch tested? Existing tests. Closes apache#23521 from viirya/SPARK-26604. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit cf133e6)
Now in `TransportRequestHandler.processStreamRequest`, when a stream request is processed, the stream id is not registered with the current channel in stream manager. It should do that so in case of that the channel gets terminated we can remove associated streams of stream requests too. This also cleans up channel registration in `StreamManager`. Since `StreamManager` doesn't register channel but only `OneForOneStreamManager` does it, this removes `registerChannel` from `StreamManager`. When `OneForOneStreamManager` goes to register stream, it will also register channel for the stream. Existing tests. Closes apache#23521 from viirya/SPARK-26604. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit cf133e6) RB=1586979 BUG=LIHADOOP-44658 G=superfriends-reviewers R=mshen,yezhou,fli,edlu A=mshen
What changes were proposed in this pull request?
Now in
TransportRequestHandler.processStreamRequest
, when a stream request is processed, the stream id is not registered with the current channel in stream manager. It should do that so in case of that the channel gets terminated we can remove associated streams of stream requests too.This also cleans up channel registration in
StreamManager
. SinceStreamManager
doesn't register channel but onlyOneForOneStreamManager
does it, this removesregisterChannel
fromStreamManager
. WhenOneForOneStreamManager
goes to register stream, it will also register channel for the stream.How was this patch tested?
Existing tests.