Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1608] feat: Introduce ExpiringClosableSupplier and refactor ShuffleManagerClient creation #1838

Merged
merged 28 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fe994f6
[#1608] refactor: Reuse ShuffleManageClient in ShuffleReader and Shuf…
xumanbu Jun 26, 2024
2e557ea
fix spark2 & refactor RssFetchFailedIterator
xumanbu Jun 27, 2024
38831c9
fix findbugs
xumanbu Jun 27, 2024
35e9a56
fix code style
xumanbu Jun 27, 2024
7fb22ce
lazyShuffleManagerClient support
xumanbu Jun 28, 2024
8a12e0c
fix some complie bug
xumanbu Jun 28, 2024
9924baa
add AutoCloseWrapper
xumanbu Jul 1, 2024
06dfe7a
fix test
xumanbu Jul 2, 2024
f50e466
fix code style
xumanbu Jul 2, 2024
cb495ad
AutoCloseWrapper add static run method
xumanbu Jul 2, 2024
dc02a3b
change managerClientAutoCloseWrapper create
xumanbu Jul 2, 2024
4f1b7ca
ShuffleManagerGrpcClient support rpcTimeout
xumanbu Jul 2, 2024
785e4be
fix bug
xumanbu Jul 2, 2024
2906e54
stash
xumanbu Jul 3, 2024
cd652a0
Revert "stash"
xumanbu Jul 3, 2024
8cfb044
fix bug
xumanbu Jul 3, 2024
5b8657c
fix spark2
xumanbu Jul 4, 2024
1499396
utoCloseWrapper replace with ExpireCloseableSupplier
xumanbu Jul 8, 2024
0bb986c
add CloseStateful interface
xumanbu Jul 24, 2024
f1cb900
Refine ExpiringCloseableSupplier
advancedxy Jul 24, 2024
f949fc7
Merge pull request #1 from advancedxy/reuse_shuffle_manage_client
xumanbu Jul 25, 2024
dc66b08
fix spark2
xumanbu Jul 25, 2024
c8a35e7
StatefulCloseable rename to StatefulCloseable
xumanbu Jul 25, 2024
5d14f36
fix accessTime by findbug SE_TRANSIENT_FIELD_NOT_RESTORED
xumanbu Jul 25, 2024
88a8a6c
fix default rpcTimeout 60s
xumanbu Jul 25, 2024
6e525de
opt accessTime
xumanbu Jul 25, 2024
2d657b1
purge ShuffleManagerClientFactory fun
xumanbu Jul 25, 2024
02ca92c
fix SE_TRANSIENT_FIELD_NOT_RESTORED
xumanbu Jul 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.shuffle.reader;

import java.io.IOException;
import java.util.Objects;

import scala.Product2;
Expand All @@ -30,10 +29,8 @@
import org.slf4j.LoggerFactory;

import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;

Expand All @@ -52,8 +49,7 @@ public static class Builder {
private int shuffleId;
private int partitionId;
private int stageAttemptId;
private String reportServerHost;
private int reportServerPort;
private ShuffleManagerClient shuffleManagerClient;

private Builder() {}

Expand All @@ -77,19 +73,13 @@ Builder stageAttemptId(int stageAttemptId) {
return this;
}

Builder reportServerHost(String host) {
this.reportServerHost = host;
return this;
}

Builder port(int port) {
this.reportServerPort = port;
Builder shuffleManagerClient(ShuffleManagerClient shuffleManagerClient) {
this.shuffleManagerClient = shuffleManagerClient;
return this;
}

<K, C> RssFetchFailedIterator<K, C> build(Iterator<Product2<K, C>> iter) {
Objects.requireNonNull(this.appId);
Objects.requireNonNull(this.reportServerHost);
return new RssFetchFailedIterator<>(this, iter);
}
}
Expand All @@ -98,37 +88,23 @@ static Builder newBuilder() {
return new Builder();
}

private static ShuffleManagerClient createShuffleManagerClient(String host, int port)
throws IOException {
ClientType grpc = ClientType.GRPC;
// host is passed from spark.driver.bindAddress, which would be set when SparkContext is
// constructed.
return ShuffleManagerClientFactory.getInstance().createShuffleManagerClient(grpc, host, port);
}

private RssException generateFetchFailedIfNecessary(RssFetchFailedException e) {
String driver = builder.reportServerHost;
int port = builder.reportServerPort;
// todo: reuse this manager client if this is a bottleneck.
try (ShuffleManagerClient client = createShuffleManagerClient(driver, port)) {
RssReportShuffleFetchFailureRequest req =
new RssReportShuffleFetchFailureRequest(
builder.appId,
builder.shuffleId,
builder.stageAttemptId,
builder.partitionId,
e.getMessage());
RssReportShuffleFetchFailureResponse response = client.reportShuffleFetchFailure(req);
if (response.getReSubmitWholeStage()) {
// since we are going to roll out the whole stage, mapIndex shouldn't matter, hence -1 is
// provided.
FetchFailedException ffe =
RssSparkShuffleUtils.createFetchFailedException(
builder.shuffleId, -1, builder.partitionId, e);
return new RssException(ffe);
}
} catch (IOException ioe) {
LOG.info("Error closing shuffle manager client with error:", ioe);
RssReportShuffleFetchFailureRequest req =
new RssReportShuffleFetchFailureRequest(
builder.appId,
builder.shuffleId,
builder.stageAttemptId,
builder.partitionId,
e.getMessage());
RssReportShuffleFetchFailureResponse response =
builder.shuffleManagerClient.reportShuffleFetchFailure(req);
if (response.getReSubmitWholeStage()) {
// since we are going to roll out the whole stage, mapIndex shouldn't matter, hence -1 is
// provided.
FetchFailedException ffe =
RssSparkShuffleUtils.createFetchFailedException(
builder.shuffleId, -1, builder.partitionId, e);
return new RssException(ffe);
}
return e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
this,
sparkConf,
shuffleWriteClient,
shuffleManagerClient,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I don't think it should be called directly.

A more appreciate way should be defining a new method such as lazyShuffleManagerClient, which will ensure
shuffle manager client is created if null.

Other places should be updated too.

Copy link
Contributor Author

@xumanbu xumanbu Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with u. my original idea is same with u, but I discovered it already initialized in the RssShuffleManage construct, so I simply addressed it.

Ok. I will attempt to implement a lazyShuffleManagerClient instead of shuffleManagerClient initialized in new RssShuffleManage at now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refactored the getReader and getWriter constructs, and I understand why the RssShuffleManage should initialize the shuffleManagerClient in the constructor. in case of RssShuffleClientFactory.ExtendWriteClientBuilder need it. so I also make ExtendWriteClientBuilder.shuffleManagerClient is lazy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rssHandle,
this::markFailedTask,
context,
Expand Down Expand Up @@ -550,7 +551,8 @@ public <K, C> ShuffleReader<K, C> getReader(
blockIdBitmap,
taskIdBitmap,
RssSparkConfig.toRssConf(sparkConf),
partitionToServers);
partitionToServers,
shuffleManagerClient);
} else {
throw new RssException("Unexpected ShuffleHandle:" + handle.getClass().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.util.RssClientConfig;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
private List<ShuffleServerInfo> shuffleServerInfoList;
private Configuration hadoopConf;
private RssConf rssConf;
private ShuffleManagerClient shuffleManagerClient;

public RssShuffleReader(
int startPartition,
Expand All @@ -90,7 +92,8 @@ public RssShuffleReader(
Roaring64NavigableMap blockIdBitmap,
Roaring64NavigableMap taskIdBitmap,
RssConf rssConf,
Map<Integer, List<ShuffleServerInfo>> partitionToServers) {
Map<Integer, List<ShuffleServerInfo>> partitionToServers,
ShuffleManagerClient shuffleManagerClient) {
this.appId = rssShuffleHandle.getAppId();
this.startPartition = startPartition;
this.endPartition = endPartition;
Expand All @@ -107,6 +110,7 @@ public RssShuffleReader(
this.hadoopConf = hadoopConf;
this.shuffleServerInfoList = (List<ShuffleServerInfo>) (partitionToServers.get(startPartition));
this.rssConf = rssConf;
this.shuffleManagerClient = shuffleManagerClient;
expectedTaskIdsBitmapFilterEnable = shuffleServerInfoList.size() > 1;
}

Expand Down Expand Up @@ -235,16 +239,12 @@ public BoxedUnit apply() {
// stage re-compute and shuffle manager server port are both set
if (rssConf.getBoolean(RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED)
&& rssConf.getInteger(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT, 0) > 0) {
String driver = rssConf.getString("driver.host", "");
int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
resultIter =
RssFetchFailedIterator.newBuilder()
.appId(appId)
.shuffleId(shuffleId)
.partitionId(startPartition)
.stageAttemptId(context.stageAttemptNumber())
.reportServerHost(driver)
.port(port)
.build(resultIter);
}
return resultIter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.shuffle.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -64,17 +63,13 @@

import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssSendFailedException;
import org.apache.uniffle.common.exception.RssWaitFailedException;
Expand Down Expand Up @@ -113,6 +108,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
private final Set<Long> blockIds = Sets.newConcurrentHashSet();
private TaskContext taskContext;
private SparkConf sparkConf;
private ShuffleManagerClient shuffleManagerClient;

public RssShuffleWriter(
String appId,
Expand All @@ -124,6 +120,7 @@ public RssShuffleWriter(
RssShuffleManager shuffleManager,
SparkConf sparkConf,
ShuffleWriteClient shuffleWriteClient,
ShuffleManagerClient shuffleManagerClient,
RssShuffleHandle<K, V, C> rssHandle,
SimpleShuffleHandleInfo shuffleHandleInfo,
TaskContext context) {
Expand All @@ -136,6 +133,7 @@ public RssShuffleWriter(
shuffleManager,
sparkConf,
shuffleWriteClient,
shuffleManagerClient,
rssHandle,
(tid) -> true,
shuffleHandleInfo,
Expand All @@ -152,6 +150,7 @@ private RssShuffleWriter(
RssShuffleManager shuffleManager,
SparkConf sparkConf,
ShuffleWriteClient shuffleWriteClient,
ShuffleManagerClient shuffleManagerClient,
RssShuffleHandle<K, V, C> rssHandle,
Function<String, Boolean> taskFailureCallback,
ShuffleHandleInfo shuffleHandleInfo,
Expand All @@ -171,6 +170,7 @@ private RssShuffleWriter(
this.bitmapSplitNum = sparkConf.get(RssSparkConfig.RSS_CLIENT_BITMAP_SPLIT_NUM);
this.serverToPartitionToBlockIds = Maps.newHashMap();
this.shuffleWriteClient = shuffleWriteClient;
this.shuffleManagerClient = shuffleManagerClient;
this.shuffleServersForData = shuffleHandleInfo.getServers();
this.partitionToServers = shuffleHandleInfo.getAvailablePartitionServersForWriter();
this.isMemoryShuffleEnabled =
Expand All @@ -189,6 +189,7 @@ public RssShuffleWriter(
RssShuffleManager shuffleManager,
SparkConf sparkConf,
ShuffleWriteClient shuffleWriteClient,
ShuffleManagerClient shuffleManagerClient,
RssShuffleHandle<K, V, C> rssHandle,
Function<String, Boolean> taskFailureCallback,
TaskContext context,
Expand All @@ -202,6 +203,7 @@ public RssShuffleWriter(
shuffleManager,
sparkConf,
shuffleWriteClient,
shuffleManagerClient,
rssHandle,
taskFailureCallback,
shuffleHandleInfo,
Expand Down Expand Up @@ -527,14 +529,6 @@ protected ShuffleWriteMetrics getShuffleWriteMetrics() {
return shuffleWriteMetrics;
}

private static ShuffleManagerClient createShuffleManagerClient(String host, int port)
throws IOException {
ClientType grpc = ClientType.GRPC;
// Host can be inferred from `spark.driver.bindAddress`, which would be set when SparkContext is
// constructed.
return ShuffleManagerClientFactory.getInstance().createShuffleManagerClient(grpc, host, port);
}

private void throwFetchFailedIfNecessary(Exception e) {
// The shuffleServer is registered only when a Block fails to be sent
if (e instanceof RssSendFailedException) {
Expand All @@ -549,34 +543,27 @@ private void throwFetchFailedIfNecessary(Exception e) {
taskContext.stageAttemptNumber(),
shuffleServerInfos,
e.getMessage());
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
String driver = rssConf.getString("driver.host", "");
int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
try (ShuffleManagerClient shuffleManagerClient = createShuffleManagerClient(driver, port)) {
RssReportShuffleWriteFailureResponse response =
shuffleManagerClient.reportShuffleWriteFailure(req);
if (response.getReSubmitWholeStage()) {
// The shuffle server is reassigned.
RssReassignServersRequest rssReassignServersRequest =
new RssReassignServersRequest(
taskContext.stageId(),
taskContext.stageAttemptNumber(),
shuffleId,
partitioner.numPartitions());
RssReassignServersResponse rssReassignServersResponse =
shuffleManagerClient.reassignOnStageResubmit(rssReassignServersRequest);
LOG.info(
"Whether the reassignment is successful: {}",
rssReassignServersResponse.isNeedReassign());
// since we are going to roll out the whole stage, mapIndex shouldn't matter, hence -1 is
// provided.
FetchFailedException ffe =
RssSparkShuffleUtils.createFetchFailedException(
shuffleId, -1, taskContext.stageAttemptNumber(), e);
throw new RssException(ffe);
}
} catch (IOException ioe) {
LOG.info("Error closing shuffle manager client with error:", ioe);
RssReportShuffleWriteFailureResponse response =
shuffleManagerClient.reportShuffleWriteFailure(req);
if (response.getReSubmitWholeStage()) {
// The shuffle server is reassigned.
RssReassignServersRequest rssReassignServersRequest =
new RssReassignServersRequest(
taskContext.stageId(),
taskContext.stageAttemptNumber(),
shuffleId,
partitioner.numPartitions());
RssReassignServersResponse rssReassignServersResponse =
shuffleManagerClient.reassignOnStageResubmit(rssReassignServersRequest);
LOG.info(
"Whether the reassignment is successful: {}",
rssReassignServersResponse.isNeedReassign());
// since we are going to roll out the whole stage, mapIndex shouldn't matter, hence -1 is
// provided.
FetchFailedException ffe =
RssSparkShuffleUtils.createFetchFailedException(
shuffleId, -1, taskContext.stageAttemptNumber(), e);
throw new RssException(ffe);
}
}
throw new RssException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
Expand Down Expand Up @@ -85,6 +86,7 @@ public void readTest() throws Exception {
rssConf.set(RssClientConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
rssConf.set(RssClientConf.RSS_INDEX_READ_LIMIT, 1000);
rssConf.set(RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE, "1000");
final ShuffleManagerClient mockShuffleManagerClient = mock(ShuffleManagerClient.class);
RssShuffleReader<String, String> rssShuffleReaderSpy =
spy(
new RssShuffleReader<>(
Expand All @@ -99,7 +101,8 @@ public void readTest() throws Exception {
blockIdBitmap,
taskIdBitmap,
rssConf,
partitionToServers));
partitionToServers,
mockShuffleManagerClient));

validateResult(rssShuffleReaderSpy.read(), expectedData, 10);
}
Expand Down
Loading
Loading