Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement] Refactor DefaultFlushEventHandler and ShuffleFlushManager#flushToFile #1459

Closed
2 of 3 tasks
zuston opened this issue Jan 17, 2024 · 0 comments · Fixed by #1461
Closed
2 of 3 tasks

[Improvement] Refactor DefaultFlushEventHandler and ShuffleFlushManager#flushToFile #1459

zuston opened this issue Jan 17, 2024 · 0 comments · Fixed by #1461

Comments

@zuston
Copy link
Member

zuston commented Jan 17, 2024

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

What would you like to be improved?

In #775, it separate flush thread pools for different storage type. But I'm not satisfied with current impl.

In DefaultFlushEventHandler, we consume the ShuffleDataFlushEvent by the function from ShuffleFlushManager#flushToFile. But in the flushToFile method, we will modify the internal state of event, this is not proper.

I hope the flushToFile only flush or check whether retry or discard the event, all the state of event should be modified or maintained in the DefaultFlushEventHandler. The following code is here:

private void flushToFile(ShuffleDataFlushEvent event) {
    long start = System.currentTimeMillis();
    boolean writeSuccess = false;

    try {
      if (!event.isValid()) {
        LOG.warn(
            "AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
        return;
      }

      List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
      if (blocks == null || blocks.isEmpty()) {
        LOG.info("There is no block to be flushed: {}", event);
        return;
      }

      Storage storage = event.getUnderStorage();
      if (storage == null) {
        LOG.error("Storage selected is null and this should not happen. event: {}", event);
        throw new EventDiscardException();
      }

      if (event.isPended()
          && System.currentTimeMillis() - event.getStartPendingTime()
              > pendingEventTimeoutSec * 1000L) {
        ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
        LOG.error(
            "Flush event cannot be flushed for {} sec, the event {} is dropped",
            pendingEventTimeoutSec,
            event);
        throw new EventDiscardException();
      }

We could introduce the EventDiscardException or EventRetryException and so on for next processing in DefaultFlushEventHandler.

How should we improve?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!
zuston added a commit to zuston/incubator-uniffle that referenced this issue Jan 17, 2024
zuston added a commit that referenced this issue Jan 18, 2024
…pport event retry into pending queue (#1461)

### What changes were proposed in this pull request?

1. Refactor DefaultFlushEventHandler to unify the logic of handling event
2. Fix incorrect some metrics 
3. Support retry event into pending queue
4. Fix the incorrect inFlushQueueSize
5. Introduce the underlying executor queue metrics

### Why are the changes needed?

Fix: #1459 #1460 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs
rickyma added a commit to rickyma/incubator-uniffle that referenced this issue Feb 23, 2024
zuston pushed a commit that referenced this issue Feb 24, 2024
…ing events (#1537)

### What changes were proposed in this pull request?

Memory may leak in exceptional scenarios when flushing events

### Why are the changes needed?

A follow-up PR for: #1459

I found out that memory may leak in exceptional scenarios when flushing events.
<img width="560" alt="企业微信截图_17086201359387" src="https://github.com/apache/incubator-uniffle/assets/13834479/e7b77e82-4336-4123-b59a-621346edd613">
<img width="811" alt="企业微信截图_1708620172540" src="https://github.com/apache/incubator-uniffle/assets/13834479/931eeb38-0c1b-4cfb-83be-485b74c10e17">
Because the following code snippets have not been executed:
```
event.addCleanupCallback(
  () -> {
    this.clearInFlushBuffer(event.getEventId());
    spBlocks.forEach(spb -> spb.getData().release());
  }
);
```
and
```
event.addCleanupCallback(() -> releaseMemory(event.getSize(), true, false));
```


It can also lead to heap memory not being released, as `HybridStorageManager`.`eventOfUnderStorageManagers` will hold a large amount of unreleased heap memory. This is because the code `event.addCleanupCallback(() -> eventOfUnderStorageManagers.invalidate(event))` has not been executed.
<img width="986" alt="企业微信截图_17086622982256" src="https://github.com/apache/incubator-uniffle/assets/13834479/98721112-8805-4f40-83ef-8daa463b6547">
<img width="454" alt="企业微信截图_17086635217020" src="https://github.com/apache/incubator-uniffle/assets/13834479/e54b9e77-33de-44dd-b727-55c41bdfc95f">



After this PR, the memory will not leak and the exception's stack trace will be something like:
[2024-02-23 14:25:02.614] [LocalFileFlushEventThreadPool-54] [ERROR] DefaultFlushEventHandler.handleEventAndUpdateMetrics - Unexpected exceptions happened due to
java.lang.NullPointerException
        at org.apache.uniffle.server.ShuffleTaskInfo.getMaxConcurrencyPerPartitionToWrite(ShuffleTaskInfo.java:109)
        at org.apache.uniffle.server.ShuffleFlushManager.getMaxConcurrencyPerPartitionWrite(ShuffleFlushManager.java:198)
        at org.apache.uniffle.server.ShuffleFlushManager.processFlushEvent(ShuffleFlushManager.java:149)
        at org.apache.uniffle.server.DefaultFlushEventHandler.handleEventAndUpdateMetrics(DefaultFlushEventHandler.java:87)
        at org.apache.uniffle.server.DefaultFlushEventHandler.lambda$dispatchEvent$0(DefaultFlushEventHandler.java:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

We can talk about the above exception in another issue after this PR is merged. This PR is focused on fixing potential memory leaks.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.

---------

Co-authored-by: Enrico Minack <github@enrico.minack.dev>
rickyma added a commit to rickyma/incubator-uniffle that referenced this issue Apr 11, 2024
jerqi pushed a commit that referenced this issue Apr 13, 2024
…nt is dropped (#1643)

### What changes were proposed in this pull request?

Print an error log when an event is dropped.

### Why are the changes needed?

A follow-up PR for: #1461.
This way, it's easier to find error logs in the log, making it convenient for troubleshooting.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unnecessary.
rickyma added a commit to rickyma/incubator-uniffle that referenced this issue Apr 15, 2024
zuston pushed a commit that referenced this issue Apr 16, 2024
…#1648)

### What changes were proposed in this pull request?

Fix the issue of log variable printing.

### Why are the changes needed?

A follow-up PR for: #1461.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unnecessary.
rickyma added a commit to rickyma/incubator-uniffle that referenced this issue May 5, 2024
rickyma added a commit to rickyma/incubator-uniffle that referenced this issue May 5, 2024
jerqi pushed a commit that referenced this issue May 6, 2024
…nt is dropped (#1643)

### What changes were proposed in this pull request?

Print an error log when an event is dropped.

### Why are the changes needed?

A follow-up PR for: #1461.
This way, it's easier to find error logs in the log, making it convenient for troubleshooting.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unnecessary.
jerqi pushed a commit that referenced this issue May 6, 2024
…#1672)

### What changes were proposed in this pull request?

Fix the issue of log variable printing.

### Why are the changes needed?

Cherry pick commits #1643 and #1648 from master to branch-0.9.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unnecessary.
zuston added a commit to zuston/incubator-uniffle that referenced this issue May 27, 2024
…and support event retry into pending queue (apache#1461)

### What changes were proposed in this pull request?

1. Refactor DefaultFlushEventHandler to unify the logic of handling event
2. Fix incorrect some metrics 
3. Support retry event into pending queue
4. Fix the incorrect inFlushQueueSize
5. Introduce the underlying executor queue metrics

### Why are the changes needed?

Fix: apache#1459 apache#1460 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs
zuston pushed a commit to zuston/incubator-uniffle that referenced this issue May 27, 2024
… flushing events (apache#1537)

### What changes were proposed in this pull request?

Memory may leak in exceptional scenarios when flushing events

### Why are the changes needed?

A follow-up PR for: apache#1459

I found out that memory may leak in exceptional scenarios when flushing events.
<img width="560" alt="企业微信截图_17086201359387" src="https://github.com/apache/incubator-uniffle/assets/13834479/e7b77e82-4336-4123-b59a-621346edd613">
<img width="811" alt="企业微信截图_1708620172540" src="https://github.com/apache/incubator-uniffle/assets/13834479/931eeb38-0c1b-4cfb-83be-485b74c10e17">
Because the following code snippets have not been executed:
```
event.addCleanupCallback(
  () -> {
    this.clearInFlushBuffer(event.getEventId());
    spBlocks.forEach(spb -> spb.getData().release());
  }
);
```
and
```
event.addCleanupCallback(() -> releaseMemory(event.getSize(), true, false));
```


It can also lead to heap memory not being released, as `HybridStorageManager`.`eventOfUnderStorageManagers` will hold a large amount of unreleased heap memory. This is because the code `event.addCleanupCallback(() -> eventOfUnderStorageManagers.invalidate(event))` has not been executed.
<img width="986" alt="企业微信截图_17086622982256" src="https://github.com/apache/incubator-uniffle/assets/13834479/98721112-8805-4f40-83ef-8daa463b6547">
<img width="454" alt="企业微信截图_17086635217020" src="https://github.com/apache/incubator-uniffle/assets/13834479/e54b9e77-33de-44dd-b727-55c41bdfc95f">



After this PR, the memory will not leak and the exception's stack trace will be something like:
[2024-02-23 14:25:02.614] [LocalFileFlushEventThreadPool-54] [ERROR] DefaultFlushEventHandler.handleEventAndUpdateMetrics - Unexpected exceptions happened due to
java.lang.NullPointerException
        at org.apache.uniffle.server.ShuffleTaskInfo.getMaxConcurrencyPerPartitionToWrite(ShuffleTaskInfo.java:109)
        at org.apache.uniffle.server.ShuffleFlushManager.getMaxConcurrencyPerPartitionWrite(ShuffleFlushManager.java:198)
        at org.apache.uniffle.server.ShuffleFlushManager.processFlushEvent(ShuffleFlushManager.java:149)
        at org.apache.uniffle.server.DefaultFlushEventHandler.handleEventAndUpdateMetrics(DefaultFlushEventHandler.java:87)
        at org.apache.uniffle.server.DefaultFlushEventHandler.lambda$dispatchEvent$0(DefaultFlushEventHandler.java:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

We can talk about the above exception in another issue after this PR is merged. This PR is focused on fixing potential memory leaks.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.

---------

Co-authored-by: Enrico Minack <github@enrico.minack.dev>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant