Skip to content

Commit

Permalink
fix according to reviewer idea
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Mar 20, 2023
1 parent 8b31970 commit 82b6e20
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ThreadUtils;

/**
* A {@link DataPusher} that is responsible for sending data to remote
* shuffle servers asynchronously.
*/
public class DataPusher implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(DataPusher.class);

Expand All @@ -49,7 +54,7 @@ public class DataPusher implements Closeable {
private final Map<String, Set<Long>> taskToSuccessBlockIds;
// Must be thread safe
private final Map<String, Set<Long>> taskToFailedBlockIds;
private String appId;
private String rssAppId;
// Must be thread safe
private final Set<String> failedTaskIds;

Expand All @@ -74,13 +79,15 @@ public DataPusher(ShuffleWriteClient shuffleWriteClient,
}

public CompletableFuture<Long> send(AddBlockEvent event) {
assert appId != null;
if (rssAppId == null) {
throw new RssException("RssAppId should be set.");
}
return CompletableFuture.supplyAsync(() -> {
String taskId = event.getTaskId();
List<ShuffleBlockInfo> shuffleBlockInfoList = event.getShuffleDataInfoList();
try {
SendShuffleDataResult result = shuffleWriteClient.sendShuffleData(
appId,
rssAppId,
shuffleBlockInfoList,
() -> !isValidTask(taskId)
);
Expand Down Expand Up @@ -113,8 +120,8 @@ public boolean isValidTask(String taskId) {
return !failedTaskIds.contains(taskId);
}

public void setAppId(String appId) {
this.appId = appId;
public void setRssAppId(String rssAppId) {
this.rssAppId = rssAppId;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockI
if (totalSize > sendSizeLimit) {
LOG.info("Build event with " + shuffleBlockInfosPerEvent.size()
+ " blocks and " + totalSize + " bytes");
// Use final temporary variables for closures
final long _memoryUsed = memoryUsed;
events.add(
new AddBlockEvent(taskId, shuffleBlockInfosPerEvent, () -> freeAllocatedMemory(_memoryUsed))
Expand All @@ -311,6 +312,7 @@ public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockI
if (!shuffleBlockInfosPerEvent.isEmpty()) {
LOG.info("Build event with " + shuffleBlockInfosPerEvent.size()
+ " blocks and " + totalSize + " bytes");
// Use final temporary variables for closures
final long _memoryUsed = memoryUsed;
events.add(
new AddBlockEvent(taskId, shuffleBlockInfosPerEvent, () -> freeAllocatedMemory(_memoryUsed))
Expand All @@ -328,7 +330,8 @@ public long spill(long size, MemoryConsumer trigger) {
try {
allOfFutures.get(memorySpillTimeoutSec, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// ignore this.
// A best effort strategy to wait.
// If timeout exception occurs, the underlying tasks won't be cancelled.
} finally {
long releasedSize = futures.stream().filter(x -> x.isDone()).mapToLong(x -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testSendData() throws ExecutionException, InterruptedException {
1,
2
);
dataPusher.setAppId("testSendData_appId");
dataPusher.setRssAppId("testSendData_appId");

// sync send
AddBlockEvent event = new AddBlockEvent("taskId", Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,7 @@ public void spillTest() {

// case1. all events are flushed within normal time.
long releasedSize = spyManager.spill(1000, mock(WriteBufferManager.class));
assertEquals(
64,
releasedSize
);
assertEquals(64, releasedSize);

// case2. partial events are not flushed within normal time.
// when calling spill func, 2 events will be spilled.
Expand All @@ -272,10 +269,7 @@ public void spillTest() {
return event.getShuffleDataInfoList().stream().mapToLong(x -> x.getFreeMemory()).sum();
}));
releasedSize = spyManager.spill(1000, mock(WriteBufferManager.class));
assertEquals(
32,
releasedSize
);
assertEquals(32, releasedSize);
assertEquals(32, spyManager.getUsedBytes());
Awaitility.await().timeout(3, TimeUnit.SECONDS).until(() -> spyManager.getUsedBytes() == 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<

if (id.get() == null) {
id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" + uuid);
dataPusher.setAppId(id.get());
dataPusher.setRssAppId(id.get());
}
LOG.info("Generate application id used in rss: " + id.get());

Expand Down

0 comments on commit 82b6e20

Please sign in to comment.