Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory #10024

Closed
wants to merge 21 commits into from

Conversation

lianhuiwang
Copy link
Contributor

@lianhuiwang lianhuiwang commented Nov 28, 2015

What changes were proposed in this pull request?

In #9241 It implemented a mechanism to call spill() on those SQL operators that support spilling if there is not enough memory for execution.
But ExternalSorter and AppendOnlyMap in Spark core are not worked. So this PR make them benefit from #9241. Now when there is not enough memory for execution, it can get memory by spilling ExternalSorter and AppendOnlyMap in Spark core.

How was this patch tested?

add two unit tests for it.

@SparkQA
Copy link

SparkQA commented Nov 28, 2015

Test build #46839 has finished for PR 10024 at commit 34f2441.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lianhuiwang
Copy link
Contributor Author

test this please

@SparkQA
Copy link

SparkQA commented Nov 29, 2015

Test build #46852 has finished for PR 10024 at commit 34f2441.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 24, 2016

Test build #49948 has finished for PR 10024 at commit b561641.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 24, 2016

Test build #49951 has finished for PR 10024 at commit 16ca87b.

  • This patch fails R style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Apr 18, 2016

Hi @lianhuiwang thanks for submitting this patch.

I just have a high-level question first. If I understand how this works correctly, the idea is that:

  1. as data is added to Spillables, they work exactly the same as before at first, so after all records have been inserted, they've still got a bunch of data in memory.
  2. When an iterator is requested, as before its still an iterator that merges data that has been spilled to disk, and data that is still-in-memory
  3. If the TaskMemoryManager requests more memory while that iterator is in flight, then the Spillables look at the position of the current iterator over the in-memory data, and spill only the remaining data to disk
  4. The Spillable then free the memory to the TaskMemoryManager, and have the in-flight iterator switch to using the new spill file.

Is this a correct understanding? If so, this seems to hinge on one key assumption: that the Spillables are never iterated over multiple times. If they were iterated multiple times, then the second iteration would be incorrect -- some of the initial data in the in-memory structure would be lost on the second iteration.

I think this assumption is sound -- it is implied by "destructive"SortedIterator in the internals, though I think the actual wrapping Spillables might allow multiple iteration now. I've been trying to think of a case where this assumption would be violated but cant' come up with anything. (If the same shuffle data is read multiple times in different tasks, the work on the shuffle-read side is simply repeated each time, I'm pretty sure there isn't any sharing). But nonetheless if that is the case, I think this deserves both a lot of comments explaining how this works, assertions which make sure this assumption is not violated, and a number of tests.

FWIW, I started down the path of writing something similar with_out_ that assumption -- when a spill was requested on an in-flight iterator, then the entire in-memory structure would get spilled to disk, and the in-flight iterator would switch to the spilled data, and advance to the same location in the spilled data that it was on the in-memory data. This was pretty convoluted, and as I started writing tests I realized there were corner cases that needed work. So I decided to submit the simpler change instead. It seems much easier to do it your way. I do have some test which I think I can add as well -- lemme dig those up and send them later today.

/**
* Allocates a heap memory of `size`.
*/
public long allocateHeapExecutionMemory(long size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function does not actually create any object, I'd like to call it acquireOnHeapMemory

@davies
Copy link
Contributor

davies commented Apr 18, 2016

@lianhuiwang Thanks for working on this, I think it's in the good direction. Two things left:

  1. thread safety. For example, you will have two threads for PythonRDD (same for RRDD), one iterate rows from parent RDD, another iterator rows from PythonRDD/RRDD, the second one could trigger spilling, the spilling happen in second thread, and the first thread could consuming the same iterator. So must make them thread safe. This is the hardest part, you could take the SQL operators as examples.

  2. Adding more tests. As @squito suggested, more comments to explain the high level ideas will be good to have.

@SparkQA
Copy link

SparkQA commented Apr 18, 2016

Test build #56099 has finished for PR 10024 at commit 7c36ef0.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SparkSqlAstBuilder extends AstBuilder

@lianhuiwang
Copy link
Contributor Author

@davies Thanks. I have added a SpillableIterator that can make consumer and spill thread safe. I think you can take a look at it.

@SparkQA
Copy link

SparkQA commented Apr 19, 2016

Test build #56229 has finished for PR 10024 at commit 70bcffa.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lianhuiwang
Copy link
Contributor Author

lianhuiwang commented Apr 19, 2016

@squito Yes, I think your understanding is correct. this PR only support that a Spillables's iterator will be called once. The code 'val sort = new Spillable() sort.iterator() sort.iterator()' will be wrong.
for ExternalSorter.iterator, I find that is just called by BlockStoreShuffleReader.Read(). Every Reader will create new ExternalSorter, So i think it's ok for it.
for ExternalAppendOnlyMap, It has two places that use it.They are Aggregator and CoGroupedRDD. we also can find that it create new ExternalAppendOnlyMap before call one iterator(). So i think it is also ok for it.
The input and output of them are Iterator and Iterator. So when Iterator is cached on memory, It will copy (k, v) to vector that can be read many times in the future. If Iterator is persisted on disk, It spill to disk directly and then data can read from disk in the future.So it will not affect when rdds about Spillable are cached correctly.
BTW: this PR has been running for a long time on our many online spark jobs.
If i missed some points, please put forward. Thanks

@SparkQA
Copy link

SparkQA commented Apr 19, 2016

Test build #56246 has finished for PR 10024 at commit b84ad96.

  • This patch fails MiMa tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 20, 2016

Test build #56365 has finished for PR 10024 at commit dc632f5.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@lianhuiwang
Copy link
Contributor Author

@davies Yes, I have update it using object.lock. I will rebased to master.

@SparkQA
Copy link

SparkQA commented Apr 21, 2016

Test build #56465 has finished for PR 10024 at commit d16b5f3.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lianhuiwang
Copy link
Contributor Author

test it please.

@lianhuiwang
Copy link
Contributor Author

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Apr 21, 2016

Test build #56456 has finished for PR 10024 at commit 97fd174.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 21, 2016

Test build #56459 has finished for PR 10024 at commit e009d95.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 21, 2016

Test build #56462 has finished for PR 10024 at commit 7ea7274.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 21, 2016

Test build #56466 has finished for PR 10024 at commit e7a98d5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lianhuiwang
Copy link
Contributor Author

@davies Now all tests have been passed. So Could you take a look again? Thanks.

@SparkQA
Copy link

SparkQA commented Apr 21, 2016

Test build #56475 has finished for PR 10024 at commit e7a98d5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val freeMemory = myMemoryThreshold - initialMemoryThreshold
_memoryBytesSpilled += freeMemory
releaseMemory()
freeMemory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should free memory first, then release memory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did collection = null in forceSpill() before releaseMemory().

@davies
Copy link
Contributor

davies commented Apr 21, 2016

LGTM

@davies
Copy link
Contributor

davies commented Apr 21, 2016

@lianhuiwang Have you run some stress tests with latest change?

@davies
Copy link
Contributor

davies commented Apr 21, 2016

This is a big change, maybe not.

@zzcclp
Copy link
Contributor

zzcclp commented Apr 21, 2016

sorry, I mistakenly deleted my comment.
what a pity, i can only merge this pr manually.

@lianhuiwang
Copy link
Contributor Author

lianhuiwang commented Apr 21, 2016

@davies I also have run unit tests with big number N.
I think @JoshRosen @andrewor14 also can take a look at this PR. Thanks.

@SparkQA
Copy link

SparkQA commented Apr 21, 2016

Test build #56496 has finished for PR 10024 at commit ff3c2b8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lianhuiwang
Copy link
Contributor Author

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Apr 21, 2016

Test build #56513 has finished for PR 10024 at commit ff3c2b8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 21, 2016

Test build #56532 has finished for PR 10024 at commit 792ff5a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser
    • class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder
    • * [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
    • case class AddJar(path: String) extends RunnableCommand
    • case class AddFile(path: String) extends RunnableCommand
    • case class CreateTableAsSelectLogicalPlan(
    • case class CreateViewAsSelectLogicalCommand(
    • case class HiveSerDe(
    • class HiveSqlParser(conf: SQLConf, hiveconf: HiveConf) extends AbstractSqlParser
    • class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf)

@davies
Copy link
Contributor

davies commented Apr 21, 2016

Merging this into master, thanks!

@asfgit asfgit closed this in 4f36917 Apr 21, 2016
@lianhuiwang
Copy link
Contributor Author

@davies Thanks.

zzcclp added a commit to zzcclp/spark that referenced this pull request Apr 22, 2016
…he same thread for memory apache#10024

[SPARK-14007] [SQL] Manage the memory used by hash map in shuffled hash join  (just TaskMemoryManager.java)
[SPARK-13113] [CORE] Remove unnecessary bit operation when decoding page number
Parth-Brahmbhatt pushed a commit to Parth-Brahmbhatt/spark that referenced this pull request Jul 25, 2016
…same thread for memory

In apache#9241 It implemented a mechanism to call spill() on those SQL operators that support spilling if there is not enough memory for execution.
But ExternalSorter and AppendOnlyMap in Spark core are not worked. So this PR make them benefit from apache#9241. Now when there is not enough memory for execution, it can get memory by spilling ExternalSorter and AppendOnlyMap in Spark core.

add two unit tests for it.

Author: Lianhui Wang <lianhuiwang09@gmail.com>

Closes apache#10024 from lianhuiwang/SPARK-4452-2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants