Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#808] feat(spark): ensure thread safe and data consistency when spilling #848

Merged
merged 2 commits into from
Jul 22, 2023

Conversation

zuston
Copy link
Member

@zuston zuston commented Apr 28, 2023

What changes were proposed in this pull request?

  1. Guarantees thread safe by only allowing spills to be triggered by the current thread
  2. Using the same logic of processing blocks in the RssShuffleWriter and WriteBufferManager to ensure the data consistency

Why are the changes needed?

Fix: #808

In this PR, we use the two ways to solve the concurrent problem for addRecord and spill function

  1. For the same thread, the spill will be invoked when adding records and unsuffcient memory. This case could ensure
    thread safe. So it will do the spill sync.
  2. When spill is invoked by other consumers, it will do nothing in this thread and just set a signal to let owner to release when adding record.

After this, we could avoid lock(may cause performance regression, like #811 did) to keep thread safe

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  1. UTs

@codecov-commenter
Copy link

codecov-commenter commented Apr 28, 2023

Codecov Report

Merging #848 (448a5a9) into master (d6b43f0) will increase coverage by 1.28%.
The diff coverage is 90.90%.

@@             Coverage Diff              @@
##             master     #848      +/-   ##
============================================
+ Coverage     53.66%   54.94%   +1.28%     
- Complexity     2523     2539      +16     
============================================
  Files           382      362      -20     
  Lines         21672    19346    -2326     
  Branches       1795     1798       +3     
============================================
- Hits          11630    10630    -1000     
+ Misses         9338     8085    -1253     
+ Partials        704      631      -73     
Impacted Files Coverage Δ
...pache/spark/shuffle/writer/WriteBufferManager.java 86.48% <89.74%> (+6.69%) ⬆️
.../java/org/apache/spark/shuffle/RssSparkConfig.java 98.67% <100.00%> (+0.03%) ⬆️

... and 25 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@zuston zuston marked this pull request as draft April 28, 2023 03:11
@zuston zuston marked this pull request as ready for review April 28, 2023 03:43
@zuston
Copy link
Member Author

zuston commented Apr 28, 2023

I propose a new way to keep thread-safe and data consistency. PTAL @jerqi

@jerqi
Copy link
Contributor

jerqi commented Apr 28, 2023

Have you verify the patch in your production environment?

@zuston
Copy link
Member Author

zuston commented Apr 28, 2023

Have you verify the patch in your production environment?

No. I hope community could review this. If OK, I will put into prod env.

@@ -65,7 +69,7 @@

private final String appId;
private final int shuffleId;
private final WriteBufferManager bufferManager;
private WriteBufferManager bufferManager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove final?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the partial test cases use the original RssShuffleWriter constructor. To reduce the test changes, remove the final modifier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can raise a refactor pr after this pr. I prefer using final here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This is not a refactor.

@@ -33,6 +33,7 @@ public class WriterBuffer {
private List<WrappedBuffer> buffers = Lists.newArrayList();
private int dataLength = 0;
private int memoryUsed = 0;
private boolean isAvailable = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WIll it be accessed by multi threads?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Buffer will only be access in a single thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary?

Yes. This is a mark to indicate the buffer has been processed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When adding record, the memory is not sufficient. This will trigger the spill operation. Once buffer is flushed to remote server, the current buffer held by the writer should be dropped. So this is a mark for write to analyze whether it is available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we change this value in the spill thread?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it won't

Any buffer flush operation in spill function only will do in the adding record thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we change this value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this part.

@@ -289,7 +355,6 @@ public void writeTest() throws Exception {
assertEquals(2, partitionToBlockIds.get(0).size());
assertEquals(2, partitionToBlockIds.get(2).size());
partitionToBlockIds.clear();
sc.stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove this?

Copy link
Member Author

@zuston zuston Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not. I could remove this change.

return sentBlocks;
}

/**
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any thought on this part? @jerqi This is the main logic

@jerqi
Copy link
Contributor

jerqi commented May 4, 2023

We would better have a config option to control whether to use this function. It seems a little complex.

@zuston
Copy link
Member Author

zuston commented May 4, 2023

We would better have a config option to control whether to use this function. It seems a little complex.

It's OK.

if (wb.isAvailable()) {
// The readonly of writer buffer means the data has been sent when spilling. To avoid data lost,
// we should re-insert the record.
requestMemoryAndInsert(partitionId, serializedData, serializedDataLength, sentBlocks);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems bad code. It may cause more concurrency problems.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will do in a thread. It's thread safe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we simplify the logic here? It's weird that we use recursion here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. This logic should be simplified.

It seems obvious that it could be split into two functions:
request memory: wait until enough memory is spilled.
insert records: only insert records when memory is satisfied.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I will check this PR in our internal env. Let's freeze this until all test finish.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

requestMemoryAndInsert(partitionId, serializedData, serializedDataLength, sentBlocks);
shuffleWriteMetrics.incRecordsWritten(1L);

if (usedBytes.get() - inSendListBytes.get() > spillSize || spillTriggered.compareAndSet(true, false)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird synchroinization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emm, sorry I don't get your point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if SpillTrigger is syncrhonization value, why do we put the variable after ||? It's strange.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. It's bug

Copy link
Contributor

@jerqi jerqi May 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You design too complex synchronization mechnism. It will be buggy usually.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bring too much cost. Every record with one lock is unreasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bring too much cost. Every record with one lock is unreasonable.

If we don't have lock conflicts, it won't bring too much cost. It is about several nano seconds.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimistic lock will. But I still think this is not complex. @advancedxy If you have time, could you help check this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if SpillTrigger is syncrhonization value, why do we put the variable after ||? It's strange.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimistic lock will. But I still think this is not complex. @advancedxy If you have time, could you help check this?

In my opinion, there is a bug, it reflects that the mechinsm is too complex.

return 0L;
// If it is triggered by other consumers, it will spilled async by current consumer.
if (trigger != this) {
spillTriggered.set(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the variable accessed by multi threads?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, It will

@zuston
Copy link
Member Author

zuston commented May 4, 2023

We would better have a config option to control whether to use this function. It seems a little complex.

Done

@zuston zuston requested a review from jerqi May 4, 2023 07:41
Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, the pr don't propose a simple or clear method to solve the conflicts of multi threads. You can't use several atomic variables to guarantee the thread safe unless it's learned from other successful projects. It is verified by many production environments.

@@ -33,6 +33,7 @@ public class WriterBuffer {
private List<WrappedBuffer> buffers = Lists.newArrayList();
private int dataLength = 0;
private int memoryUsed = 0;
private boolean isAvailable = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we change this value?

// A best effort strategy to wait.
// If timeout exception occurs, the underlying tasks won't be cancelled.
} finally {
long releasedSize = futures.stream().filter(x -> x.isDone()).mapToLong(x -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to handle the rpc failure?

return 0L;
}

List<CompletableFuture<Long>> futures = spillFunc.apply(clear());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will call the method clear in different threads. Is it thread safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm correct, I think you don't understand this design. Please see the below explaintion.

@zuston
Copy link
Member Author

zuston commented May 4, 2023

I want to re-explain this design to make more people understand. @jerqi

As we know, in the master codebase of WriteBufferManager , concurrency doesn't exist. Buffer flushing operation will only be acted in the processing of adding records, which is owned by one thread.

But when we want to flush buffer to release memory in spill method, it's thread unsafe. Because spill function will be invoked by TaskMemoryManager . So does it means the spill method and addRecord are acted in the different threads?
I think no. Reasons are as follows.

  1. When adding records, if memory is tight, it will request memory from TaskMemoryManager . And then the free memory is empty, manager will spill the underlying MemoryConsumer . So the current WriteBufferManager 's spill method will be invoked in this process. From this example, the two methods will be invoked in the same thread, so this is thread safe. Right?
  2. If the current WriteBufferManager 's spill method is invoked by TaskMemoryManager , due to others' memory consumers' unsufficient memory. This is thread unsafe.

Based on above examples, for one WriteBufferManager , if one consumer is not itself, it should avoid doing flushing buffer due to the concurrency problem of adding records. If the consumer is itself, it could doing buffer flushing, it's thread safe.

@jerqi jerqi self-requested a review May 4, 2023 11:02
requestMemory(Math.max(bufferSegmentSize, serializedDataLength));
requestMemory(required);
if (wb.isAvailable()) {
// The readonly of writer buffer means the data has been sent when spilling. To avoid data lost,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment isn't clear enough. You should explain more information.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

@advancedxy
Copy link
Contributor

advancedxy commented May 8, 2023

The idea seems simple enough and should work.
I'm a bit of concerning that whether would it work well for production cases, since it doesn't respond to other tasks' spill request.

But it's better than current impl.

@jerqi jerqi changed the title [#808] fix(spark): ensure thread safe and data consistency when spilling [#808] feat(spark): ensure thread safe and data consistency when spilling May 10, 2023
@zuston zuston requested review from advancedxy and jerqi July 20, 2023 07:45
@zuston
Copy link
Member Author

zuston commented Jul 20, 2023

Sorry for the late reply. I have updated this code! PTAL @jerqi @advancedxy Thanks!

@zuston
Copy link
Member Author

zuston commented Jul 21, 2023

Although this has a long history, do you mind take a look? @leixm @smallzhongfeng @xianjingfeng

Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Now it's ok for me.

@zuston
Copy link
Member Author

zuston commented Jul 22, 2023

If nobody have any ideas, I will merge this in next 2 hour. Thanks ~

@zuston zuston merged commit 03b04b9 into apache:master Jul 22, 2023
@zuston zuston deleted the issue-808-2 branch July 22, 2023 13:10
zuston pushed a commit that referenced this pull request Mar 7, 2024
…sure data correctness (#1558)

### What changes were proposed in this pull request?

Verify the number of written records to enhance data accuracy.
Make sure all data records are sent by clients.
Make sure bugs like #714 will never be introduced into the code.

### Why are the changes needed?

A follow-up PR for #848.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] NullPointerException of WriterBuffer.getData due to race condition
4 participants