-
Notifications
You must be signed in to change notification settings - Fork 153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1608] feat: Introduce ExpiringClosableSupplier and refactor ShuffleManagerClient creation #1838
[#1608] feat: Introduce ExpiringClosableSupplier and refactor ShuffleManagerClient creation #1838
Conversation
@zuston @advancedxy PTAL |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1838 +/- ##
=============================================
+ Coverage 0 52.62% +52.62%
- Complexity 0 2561 +2561
=============================================
Files 0 415 +415
Lines 0 18784 +18784
Branches 0 1703 +1703
=============================================
+ Hits 0 9885 +9885
- Misses 0 8288 +8288
- Partials 0 611 +611 ☔ View full report in Codecov by Sentry. |
@@ -446,6 +446,7 @@ public <K, V> ShuffleWriter<K, V> getWriter( | |||
this, | |||
sparkConf, | |||
shuffleWriteClient, | |||
shuffleManagerClient, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, I don't think it should be called directly.
A more appreciate way should be defining a new method such as lazyShuffleManagerClient
, which will ensure
shuffle manager client is created if null.
Other places should be updated too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with u. my original idea is same with u, but I discovered it already initialized in the RssShuffleManage
construct, so I simply addressed it.
Ok. I will attempt to implement a lazyShuffleManagerClient
instead of shuffleManagerClient
initialized in new RssShuffleManage
at now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have refactored the getReader and getWriter constructs, and I understand why the RssShuffleManage
should initialize the shuffleManagerClient
in the constructor. in case of RssShuffleClientFactory.ExtendWriteClientBuilder
need it. so I also make ExtendWriteClientBuilder.shuffleManagerClient
is lazy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@advancedxy @zuston PTALA.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution. Left one minor comment.
@@ -446,6 +442,7 @@ public <K, V> ShuffleWriter<K, V> getWriter( | |||
this, | |||
sparkConf, | |||
shuffleWriteClient, | |||
this::getOrCreateShuffleManagerClient, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, maybe I wasn't clear enough and mislead you. It's fine and probably better to accept a ShuffleManagerClient
in builders instead of a supplier.
Instead of passing the shuffleManagerClient
directly, we probably want to call getOrCreateShuffleManagerClient()
instead to make sure the client is actually initialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the lifecycle of ShuffleManagerClient
may be different with the shuffle manager, if this is idle with some times, it should be closed(this is not supported now). If that, we should provide the wrapper to create or get the active ShuffleManagerClient
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the lifecycle of
ShuffleManagerClient
may be different with the shuffle manager, if this is idle with some times, it should be closed(this is not supported now). If that, we should provide the wrapper to create or get the activeShuffleManagerClient
.
agree with u. the lifecycle of ShuffleManagerClient
is major factor in guiding how to modify
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, maybe I wasn't clear enough and mislead you. It's fine and probably better to accept a
ShuffleManagerClient
in builders instead of a supplier.Instead of passing the
shuffleManagerClient
directly, we probably want to callgetOrCreateShuffleManagerClient()
instead to make sure the client is actually initialized.
Do you might be considering make ShuffleManagerClient
as a single object or a LazyShuffleManagerClient
impl which logic like a lazy val shuffleManagerClient
in Scala ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Properly not. Maybe like the following code.
class AutoClosableShuffleManagerClient extends ShuffleManagerClient {
private ShuffleManagerClient client;
private int reference = 0
public get() {
reference += 1;
if (client == null) {
client = new ShuffleManagerClient();
}
}
public void close() {
reference -= 1
if (reference == 0) {
client.close()
client = null
}
}
}
Maybe this is not right, just my simple thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zuston @advancedxy I have add a AutoCloseWrapper
in commit, please take a look and verify it's accuracy?
public void stop() { | ||
if (shuffleManagerClient != null) { | ||
try { | ||
shuffleManagerClient.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zuston I make the lifecycle of ShuffleManagerClient with ShuffleManage, referencee with ShuffleWriteClient.
This reverts commit 2906e54.
|
||
import org.apache.uniffle.common.exception.RssException; | ||
|
||
public class AutoCloseWrapper<T extends Closeable> implements Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this, how about we leverage some building blocks in Guava, such as:
- CacheBuilder, which closes the the closeable when evicted.
- or A sightly modified version of ExpiringMemoizingSupplier which should close the previous closeable(Client) when refreshing to get a new one, we can implement that in Uniffle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your advice, let me reconsider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@advancedxy I referenced the ExpiringMemoizingSupplier and used ExpireCloseableSupplier
instead of AutoCloseWrapper
in the last commit. Please check if this meets your expectations.
add some ideas:
ExpireCloseableSupplier
is compared to AutoCloseWrapper
, the ExpireCloseableSupplier
implements the Supplier interface while the AutoCloseWrapper
implements Closeable.
I believe AutoCloseWrapper
is superior, because it allows for better management of client closure when obtaining a client.
@@ -73,7 +74,11 @@ public void reportShuffleResult( | |||
RssReportShuffleResultRequest request = | |||
new RssReportShuffleResultRequest( | |||
appId, shuffleId, taskAttemptId, partitionToBlockIds, bitmapNum); | |||
shuffleManagerClient.reportShuffleResult(request); | |||
AutoCloseWrapper.run( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm. This seems a bit of code smell here, I don't think we should introduce such complex construct to simply delay the close of closeable. It brings the burden of using AutoCloseWrapper.run
on every invocation.
7023449
to
1499396
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xumanbu thanks for your hard work. I think it's in good shape and the right direction.
The biggest issue is how to handle close from external caller for ExpiringCloseableSupplier
.
@@ -346,6 +343,7 @@ public static boolean isStageResubmitSupported() { | |||
} | |||
|
|||
public static RssException reportRssFetchFailedException( | |||
ExpireCloseableSupplier<ShuffleManagerClient> managerClientSupplier, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can simply use Supplier<ShuffleMangerClient>
here?
The delayed close is an implementation detail and doesn't have to be exposed to the API side.
One defeat of this approach is that callers may close the ShuffleManagerClient
explicitly, which might break how ExpireCloseableSupplier<ShuffleManagerClient>
work: once it's closed by a caller, it cannot be reused again, however, the supplier has no idea it's already been closed.
For that case, I think we should callout in the comment/javadoc to indicate that ShuffleManagerClients are self-closeable.
@@ -52,8 +51,7 @@ public static class Builder { | |||
private int shuffleId; | |||
private int partitionId; | |||
private int stageAttemptId; | |||
private String reportServerHost; | |||
private int reportServerPort; | |||
private ExpireCloseableSupplier<ShuffleManagerClient> managerClientSupplier; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssFetchFailedIterator.java
Outdated
Show resolved
Hide resolved
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
Show resolved
Hide resolved
client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/util/ExpireCloseableSupplier.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/util/ExpireCloseableSupplier.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/util/ExpireCloseableSupplier.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/util/ExpireCloseableSupplier.java
Outdated
Show resolved
Hide resolved
@advancedxy Very thank you for helping me review the code and for your valuable suggestions. I'm still a bit confused by some comments, I need think about more and optimize the code. |
You are welcome. And if you have any further questions or problems, feel free to ask here. |
6aa6674
to
d9a1ab9
Compare
d9a1ab9
to
0bb986c
Compare
common/src/main/java/org/apache/uniffle/common/util/CloseStateful.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
void testDelegateExtendClose() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this test code, looking good.
common/src/test/java/org/apache/uniffle/common/util/ExpiringCloseableSupplierTest.java
Outdated
Show resolved
Hide resolved
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/util/ExpiringCloseableSupplier.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/util/ExpiringCloseableSupplier.java
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/util/ExpiringCloseableSupplier.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/util/ExpiringCloseableSupplier.java
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/util/ExpiringCloseableSupplier.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I will leave this for some more time before others have a chance to take a look at this.
common/src/main/java/org/apache/uniffle/common/util/ExpiringCloseableSupplier.java
Outdated
Show resolved
Hide resolved
this.shuffleManagerClient = | ||
ShuffleManagerClientFactory.getInstance() | ||
.createShuffleManagerClient(ClientType.GRPC, driver, port); | ||
long rpcTimeout = rssConf.getLong(RssBaseConf.RSS_CLIENT_TYPE_GRPC_TIMEOUT_MS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we unify rss.client.rpc.timeout.ms
and rss.rpc.client.type.grpc.timeout
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emm, the rss.rpc.client.type.grpc.timeout
configuration is only used in ShuffleManageClient
, so we can remove it. instead it with rss.client.rpc.timeout.ms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you fix this in this PR as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about address this in a follow-up PR? I think this PR is coupling too much changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, pending CI passes.
Let's wait it until tonight incase others have more comments.
Merging this since there's no objections. Others are welcome to file additional comments if any and can be addressed in a follow-up PR. |
BTW, thanks your contribution @xumanbu. |
…huffleManagerClient creation (apache#1838) ### What changes were proposed in this pull request? 1. Introduce StatefulCloseable and ExpiringClosableSupplier 2. refactor ShuffleManagerClient to leverage ExpiringClosableSupplier ### Why are the changes needed? For better code quality ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs and new UTs. (cherry picked from commit 457c865)
What changes were proposed in this pull request?
(Please outline the changes and how this PR fixes the issue.)
Why are the changes needed?
createShuffleManagerClient
function is defined in bothRssShuffleWriter
andRssFetchFailedIterator
.ShuffleManagerClient
is created inRssShuffleManager
whenshuffleManagerRpcServiceEnabled
is true. We can reuse the client inRssShuffleReader
andRssShuffleWriter
.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UTs.