Skip to content

Commit

Permalink
fix(client): disable spark memory spill (#844)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Disable the memory spill operation.

### Why are the changes needed?

In #714 , the memory spill is introduced to solve the dead lock. For a pity,
these part code should be handled carefully, including concurrency and data
consistency, like the fix PR #811 . And this part has bugs and I will fix these in
the next days. 

Currently, I want to revert the PR #714. But the partial refactor of #714 is still meaningful. So
I submit this PR to disable the memory spill.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Don't need
  • Loading branch information
zuston authored Apr 26, 2023
1 parent 47c4966 commit 4ce4a8d
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import scala.reflect.ClassTag$;
import scala.reflect.ManifestFactory$;
Expand Down Expand Up @@ -324,27 +321,7 @@ public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockI

@Override
public long spill(long size, MemoryConsumer trigger) {
List<AddBlockEvent> events = buildBlockEvents(clear());
List<CompletableFuture<Long>> futures = events.stream().map(x -> spillFunc.apply(x)).collect(Collectors.toList());
CompletableFuture<Void> allOfFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
try {
allOfFutures.get(memorySpillTimeoutSec, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// A best effort strategy to wait.
// If timeout exception occurs, the underlying tasks won't be cancelled.
} finally {
long releasedSize = futures.stream().filter(x -> x.isDone()).mapToLong(x -> {
try {
return x.get();
} catch (Exception e) {
return 0;
}
}).sum();
LOG.info("[taskId: {}] Spill triggered by memory consumer of {}, released memory size: {}",
taskId, trigger.getClass().getSimpleName(), releasedSize);
return releasedSize;
}
return 0L;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ public void buildBlockEventsTest() {
assertEquals(3, events.size());
}

@Test
public void spillTest() {
SparkConf conf = getConf();
conf.set("spark.rss.client.send.size.limit", "1000");
Expand Down

0 comments on commit 4ce4a8d

Please sign in to comment.