Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7860][CORE] In shuffle writer, replace MemoryMappedFile to avoid OOM #7861

Merged
merged 4 commits into from
Nov 28, 2024

Conversation

ccat3z
Copy link
Contributor

@ccat3z ccat3z commented Nov 8, 2024

What changes were proposed in this pull request?

This pr fixed #7860 by MmapFileStream extended arrow:io::InputStream. MmapFileStream will invoke MADV_DONTNEED to release previous memory when read next range of data.

How was this patch tested?

// Generate 10 partitions, each partition has about 10GB random data.
def gen(scale: Int, parts: Int) = {
  sc.parallelize(1 to (1024*1024), numSlices = 1000)
    .map(x => (x % 1000, randStr(scale * parts)))
    .repartition(parts)
    .toDF("a", "b")
    .save./* ... */
}

// Trigger shuffle spill by `repartition(50)`.
def test(parts: Int = 50) = {
  spark.read./* ... */.repartition(parts)
    .filter(expr("a < 0*rand()")) // avoid pushdown repartition
}
# Executor Memory Config
spark.executor.memory=512M
spark.yarn.executor.memoryOverhead=512M
spark.gluten.memory.offHeap.size.in.bytes=1610612736

Test Result:

impl avg time to merge spills (s) avg total spilled size of each task (MB)
read (arrow ReadableFile) 10.58706836156 9935.920098495480
mmap (open required range by MemoryMappedFile) 6.602059312420000 9935.920098495480
madv (this pr) 6.73993204562 9935.920098495480
mmap (repace madv by munmap in this pr) 6.55791399852 9935.920098495480

munmap patch in above test:

diff --git a/cpp/core/shuffle/Utils.cc b/cpp/core/shuffle/Utils.cc
index 1ceb777f1..742c53c90 100644
--- a/cpp/core/shuffle/Utils.cc
+++ b/cpp/core/shuffle/Utils.cc
@@ -243,9 +243,9 @@ void MmapFileStream::advance(int64_t length) {
 
   auto purgeLength = (pos_ - posRetain_) & pageMask;
   if (purgeLength > 0) {
-    int ret = madvise(data_ + posRetain_, purgeLength, MADV_DONTNEED);
+    int ret = munmap(data_ + posRetain_, purgeLength);
     if (ret != 0) {
-      LOG(WARNING) << "fadvise failed " << ::arrow::internal::ErrnoMessage(errno);
+      LOG(WARNING) << "munmap failed " << ::arrow::internal::ErrnoMessage(errno);
     }
     posRetain_ += purgeLength;
   }
@@ -269,7 +269,7 @@ void MmapFileStream::willNeed(int64_t length) {
 
 arrow::Status MmapFileStream::Close() {
   if (data_ != nullptr) {
-    int result = munmap(data_, size_);
+    int result = munmap(data_ + posRetain_, size_ - posRetain_);
     if (result != 0) {
       LOG(WARNING) << "munmap failed";
     }

@github-actions github-actions bot added the VELOX label Nov 8, 2024
Copy link

github-actions bot commented Nov 8, 2024

#7860

@ccat3z
Copy link
Contributor Author

ccat3z commented Nov 8, 2024

cc @kecookier

@zhztheplayer zhztheplayer changed the title [GLUTEN-7860][CORE] Replace MemoryMappedFile with ReadableFile to avoid OOM [GLUTEN-7860][CORE] In shuffle writer, replace MemoryMappedFile with ReadableFile to avoid OOM Nov 8, 2024
@kecookier
Copy link
Contributor

/Benchmark Velox

1 similar comment
@ccat3z
Copy link
Contributor Author

ccat3z commented Nov 9, 2024

/Benchmark Velox

@ccat3z ccat3z marked this pull request as ready for review November 9, 2024 03:15
@ccat3z
Copy link
Contributor Author

ccat3z commented Nov 9, 2024

/Benchmark Velox

Copy link
Member

@zhztheplayer zhztheplayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccat3z Do you see #7860 fixed with this approach?

I am triggering a benchmark manually.

cc @marin-ma @FelixYBW

@@ -73,7 +73,7 @@ void Spill::insertPayload(

void Spill::openSpillFile() {
if (!is_) {
GLUTEN_ASSIGN_OR_THROW(is_, arrow::io::MemoryMappedFile::Open(spillFile_, arrow::io::FileMode::READ));
GLUTEN_ASSIGN_OR_THROW(is_, arrow::io::ReadableFile::Open(spillFile_));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the API implemented with buffered read?

Not sure whether https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/buffered.h may help here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spill merge needn't buffer

@marin-ma
Copy link
Contributor

I am triggering a benchmark manually.

@zhztheplayer There's no shuffle spill on jenkins. The change won't be tested.

@zhztheplayer
Copy link
Member

I am triggering a benchmark manually.

@zhztheplayer There's no shuffle spill on jenkins. The change won't be tested.

Thought we always rely on Spark-controlled spill in shuffle. Does Jenkins CI always have enough memory for all shuffle data?

@GlutenPerfBot

This comment was marked as off-topic.

@FelixYBW
Copy link
Contributor

@zhztheplayer There's no shuffle spill on jenkins. The change won't be tested.

Is it because the spill will be triggered on other operators in the pipeline? Like a sort + shuffle. Will the sort be triggered or shuffle?

@FelixYBW
Copy link
Contributor

@zhztheplayer @marin-ma can we create a query and config to test it?

@ccat3z ccat3z changed the title [GLUTEN-7860][CORE] In shuffle writer, replace MemoryMappedFile with ReadableFile to avoid OOM [GLUTEN-7860][CORE] In shuffle writer, replace MemoryMappedFile to avoid OOM Nov 18, 2024
@ccat3z ccat3z force-pushed the mmap-read-file branch 2 times, most recently from eaf10aa to 43a4f06 Compare November 18, 2024 03:34
@ccat3z
Copy link
Contributor Author

ccat3z commented Nov 18, 2024

@FelixYBW @zhztheplayer I added MmapFileStream in this pr. MmapFileStream will invoke MADV_DONTNEED to release previous memory when reading next range of data. Test approach and result has updated in PR description.

Comment on lines 230 to 233
auto fstream = std::shared_ptr<MmapFileStream>(new MmapFileStream());
fstream->fd_ = std::move(fd);
fstream->data_ = static_cast<uint8_t*>(result);
fstream->size_ = size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use std::make_shared and set the argument through ctor?

@@ -72,4 +72,34 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(

std::shared_ptr<arrow::Buffer> zeroLengthNullBuffer();

class MmapFileStream : public arrow::io::InputStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some comments to explain the usage/functionality for this class?

Copy link
Contributor

@marin-ma marin-ma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments. Thanks!

// to prefetch and release memory timely.
class MmapFileStream : public arrow::io::InputStream {
public:
MmapFileStream(arrow::internal::FileDescriptor fd, uint8_t* data, int64_t size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please separate the declaration and definition. And add a blank line between two member functions.

arrow::Status Close() override;
arrow::Result<int64_t> Read(int64_t nbytes, void* out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
bool closed() const override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

};

private:
arrow::Result<int64_t> actualReadSize(int64_t nbytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -72,4 +72,37 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(

std::shared_ptr<arrow::Buffer> zeroLengthNullBuffer();

// MmapFileStream is used to optimize sequential file reading. It uses madvise
// to prefetch and release memory timely.
class MmapFileStream : public arrow::io::InputStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may contribute MmapFileStream to Apache Arrow in future.

@FelixYBW
Copy link
Contributor

Thank you. Looks good solution!

Copy link
Contributor

@marin-ma marin-ma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks!

@zhztheplayer
Copy link
Member

Is it because the spill will be triggered on other operators in the pipeline? Like a sort + shuffle. Will the sort be triggered or shuffle?

So far the spill will be triggered on components holding more memory no matter it's Velox operator or shuffle. We have a basic priority setting in Spiller API and in future we can extend and use it to implement some fixed spill order.

@FelixYBW
Copy link
Contributor

Is it because the spill will be triggered on other operators in the pipeline? Like a sort + shuffle. Will the sort be triggered or shuffle?

So far the spill will be triggered on components holding more memory no matter it's Velox operator or shuffle. We have a basic priority setting in Spiller API and in future we can extend and use it to implement some fixed spill order.

So now once spill is called, all operator's spill is triggered, right?

@zhztheplayer
Copy link
Member

zhztheplayer commented Nov 20, 2024

Is it because the spill will be triggered on other operators in the pipeline? Like a sort + shuffle. Will the sort be triggered or shuffle?

So far the spill will be triggered on components holding more memory no matter it's Velox operator or shuffle. We have a basic priority setting in Spiller API and in future we can extend and use it to implement some fixed spill order.

So now once spill is called, all operator's spill is triggered, right?

We pass a target spill size to Velox API so usually the spill call stops when enough memory space is reclaimed. So a portion of the operators can be omitted in the procedure.

@FelixYBW
Copy link
Contributor

We pass a target spill size to Velox API so usually the spill call stops when enough memory space is reclaimed. So a portion of the operators can be omitted in the procedure.

Will it still call shuffle's writer's spill anyway?

@FelixYBW
Copy link
Contributor

can you resolve conflict?

@ccat3z
Copy link
Contributor Author

ccat3z commented Nov 25, 2024

can you resolve conflict?

Rebased to latest main.

@zhztheplayer
Copy link
Member

We pass a target spill size to Velox API so usually the spill call stops when enough memory space is reclaimed. So a portion of the operators can be omitted in the procedure.

Will it still call shuffle's writer's spill anyway?

yes exactly

};

void MmapFileStream::advance(int64_t length) {
static auto pageSize = static_cast<size_t>(arrow::internal::GetPageSize());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

page should be too small. Can you use config of spark.shuffle.file.buffer?

@github-actions github-actions bot added the CORE works for Gluten Core label Nov 27, 2024
Copy link

Run Gluten Clickhouse CI on x86

2 similar comments
Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

@@ -420,6 +420,7 @@ arrow::Status LocalPartitionWriter::mergeSpills(uint32_t partitionId) {
auto spillIter = spills_.begin();
while (spillIter != spills_.end()) {
ARROW_ASSIGN_OR_RAISE(auto st, dataFileOs_->Tell());
(*spillIter)->openForRead(options_.shuffleFileBufferSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you fix the failed UTs? It's likely the failures are caused by this change. You should add spill->openForRead(options_.shuffleFileBufferSize) before line 460
https://github.com/apache/incubator-gluten/pull/7861/files#diff-d58dca9ecda139d81d2630e1adb22ae01953ad4de0a9c086bd07c07619a3a927R460

Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

Copy link

Run Gluten Clickhouse CI on x86

@marin-ma marin-ma merged commit 72b8810 into apache:main Nov 28, 2024
48 checks passed
weiting-chen pushed a commit that referenced this pull request Nov 29, 2024
* Impl MmapFileStream

* Prefetch spark.shuffle.file.buffer

* fix ut

* format

---------

Co-authored-by: Rong Ma <rong.ma@intel.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CORE works for Gluten Core VELOX
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[CORE] LocalParitionWriter causes OOM during mergeSpills
6 participants