Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3074] [PySpark] support groupByKey() with single huge key #1977

Closed
wants to merge 49 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Aug 16, 2014

This patch change groupByKey() to use external sort based approach, so it can support single huge key.

For example, it can group by a dataset including one hot key with 40 millions values (strings), using 500M memory for Python worker, finished in about 2 minutes. (it will need 6G memory in hash based approach).

During groupByKey(), it will do in-memory groupBy first. If the dataset can not fit in memory, then data will be partitioned by hash. If one partition still can not fit in memory, it will switch to sort based groupBy().

@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have started for PR 1977 at commit 083d842.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have started for PR 1977 at commit d05060d.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have finished for PR 1977 at commit 083d842.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SameKey(object):
    • class GroupByKey(object):
    • class ResultIterable(object):
    • class ExternalSorter(object):

@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have finished for PR 1977 at commit d05060d.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SameKey(object):
    • class GroupByKey(object):
    • class ResultIterable(object):
    • class ExternalSorter(object):

@sryza
Copy link
Contributor

sryza commented Aug 16, 2014

Does / will the same functionality exist in Scala/Java?

@andrewor14
Copy link
Contributor

I believe this is one of those few things in Spark where python is ahead of Scala

@davies
Copy link
Contributor Author

davies commented Aug 16, 2014

@sryza There are similar things in Scala, but we can not compare the Python object in Scala, so it can not use the groupByKey() in Scala directly. All the aggregation should be implemented in Python also.

@andrewor14, I hope PySpark could catch up with Scala.

davies added 2 commits August 15, 2014 23:17
this will reduce the memory used when merging many files together.
@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have started for PR 1977 at commit efa23df.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have finished for PR 1977 at commit efa23df.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class Serializer
    • abstract class SerializerInstance
    • abstract class SerializationStream
    • abstract class DeserializationStream
    • class ShuffleBlockManager(blockManager: BlockManager,
    • class SameKey(object):
    • class GroupByKey(object):
    • class ResultIterable(object):
    • class FlattedValuesSerializer(BatchedSerializer):
    • class ExternalSorter(object):

@davies
Copy link
Contributor Author

davies commented Aug 16, 2014

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have started for PR 1977 at commit b40bae7.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 16, 2014

QA tests have finished for PR 1977 at commit b40bae7.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SameKey(object):
    • class GroupByKey(object):
    • class ResultIterable(object):
    • class FlattedValuesSerializer(BatchedSerializer):
    • class ExternalSorter(object):

@davies davies changed the title [SPARK-3074] [PySpark] support groupByKey() with single huge key [WIP] [SPARK-3074] [PySpark] support groupByKey() with single huge key Aug 18, 2014
@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 1977 at commit 1ea0669.

  • This patch does not merge cleanly!

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have finished for PR 1977 at commit 1ea0669.

  • This patch fails unit tests.
  • This patch does not merge cleanly!

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 1977 at commit 085aef8.

  • This patch merges cleanly.

@davies davies changed the title [WIP] [SPARK-3074] [PySpark] support groupByKey() with single huge key [SPARK-3074] [PySpark] support groupByKey() with single huge key Aug 19, 2014
@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have started for PR 1977 at commit 11ba318.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have finished for PR 1977 at commit 085aef8.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ResultIterable(object):
    • class FlattedValuesSerializer(BatchedSerializer):
    • class ExternalSorter(object):
    • class SameKey(object):
    • class GroupByKey(object):
    • class ExternalGroupBy(ExternalMerger):

@SparkQA
Copy link

SparkQA commented Aug 19, 2014

QA tests have finished for PR 1977 at commit 11ba318.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")
    • case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")
    • case class Params(input: String = "data/mllib/sample_binary_classification_data.txt")
    • class ResultIterable(object):
    • class FlattedValuesSerializer(BatchedSerializer):
    • class ExternalSorter(object):
    • class SameKey(object):
    • class GroupByKey(object):
    • class ExternalGroupBy(ExternalMerger):

@davies
Copy link
Contributor Author

davies commented Apr 8, 2015

@JoshRosen the last comments had been addressed.

@SparkQA
Copy link

SparkQA commented Apr 8, 2015

Test build #29900 has started for PR 1977 at commit 0b0fde8.

@SparkQA
Copy link

SparkQA commented Apr 8, 2015

Test build #29895 has finished for PR 1977 at commit 0dcf320.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FlattenedValuesSerializer(BatchedSerializer):
    • class ExternalList(object):
    • class GroupByKey(object):
    • class ChainedIterable(object):
    • class ExternalGroupBy(ExternalMerger):
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29895/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Apr 9, 2015

Test build #29900 has finished for PR 1977 at commit 0b0fde8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FlattenedValuesSerializer(BatchedSerializer):
    • class ExternalList(object):
    • class ExternalListOfList(ExternalList):
    • class GroupByKey(object):
    • class GroupListsByKey(GroupByKey):
    • class ChainedIterable(object):
    • class ExternalGroupBy(ExternalMerger):
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29900/
Test PASSed.

self.count += len(value) - 1


class GroupByKey(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we only directly use GroupByKey in tests, while the actual shuffle code only uses GroupListsByKey. Is this intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'd want to the code and test of ExternalList and GroupByKey could be easy to understand.

@JoshRosen
Copy link
Contributor

Sorry for my initial confusion regarding the external lists of lists. I think that the __len__ thing might be an issue if we ever directly expose ExternalListOfList to users, but it looks like we currently only expose it through a ChainedIterable in this code, so it doesn't appear to be a problem yet. This still might be worth addressing if you agree that it could help prevent future bugs if we start using this in more places.

@davies
Copy link
Contributor Author

davies commented Apr 9, 2015

@JoshRosen Thanks for the comments, it looks better now.

@SparkQA
Copy link

SparkQA commented Apr 9, 2015

Test build #29921 has started for PR 1977 at commit e78c15c.

@SparkQA
Copy link

SparkQA commented Apr 9, 2015

Test build #29921 has finished for PR 1977 at commit e78c15c.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FlattenedValuesSerializer(BatchedSerializer):
    • class ExternalList(object):
    • class ExternalListOfList(ExternalList):
    • class GroupByKey(object):
    • class ExternalGroupBy(ExternalMerger):
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29921/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 9, 2015

Test build #29925 has started for PR 1977 at commit 67772dd.

@SparkQA
Copy link

SparkQA commented Apr 9, 2015

Test build #29925 has finished for PR 1977 at commit 67772dd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FlattenedValuesSerializer(BatchedSerializer):
    • class ExternalList(object):
    • class ExternalListOfList(ExternalList):
    • class GroupByKey(object):
    • class ExternalGroupBy(ExternalMerger):
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29925/
Test PASSed.

@JoshRosen
Copy link
Contributor

I spent a bit of time fuzz-testing this code to try to reach 100% coverage of the changes in this patch. While doing so, I think I uncovered a bug:

../Spark/python/pyspark/shuffle.py:383: in _external_items
    for v in self._merged_items(i):
../Spark/python/pyspark/shuffle.py:826: in <genexpr>
    return ((k, vs) for k, vs in GroupByKey(sorted_items))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <pyspark.shuffle.GroupByKey object at 0x1048d0990>

    def next(self):
>       key, value = self.next_item if self.next_item else next(self.iterator)
E       TypeError: list object is not an iterator

../Spark/python/pyspark/shuffle.py:669: TypeError

It looks like the GroupByKey object expects to be instantiated with an iterator, but in GroupBy. _merge_sorted_items we end up calling it with the output of ExternalSorter.sorted. It looks like there's a branch in ExternalSorter.sorted where we can end up returning a list instead of an iterator (line 517), where we return current_chunk.

@davies
Copy link
Contributor Author

davies commented Apr 9, 2015

@JoshRosen Good catch! fixed it.

@SparkQA
Copy link

SparkQA commented Apr 9, 2015

Test build #29967 has started for PR 1977 at commit af3713a.

@SparkQA
Copy link

SparkQA commented Apr 9, 2015

Test build #29967 has finished for PR 1977 at commit af3713a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class FlattenedValuesSerializer(BatchedSerializer):
    • class ExternalList(object):
    • class ExternalListOfList(ExternalList):
    • class GroupByKey(object):
    • class ExternalGroupBy(ExternalMerger):
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29967/
Test PASSed.

@JoshRosen
Copy link
Contributor

LGTM. I spent more time testing this locally, commenting out various memory threshold flags as necessary in order to get good branch coverage, and didn't find any new problems. We should definitely do performance benchmarking of this feature during the 1.4 QA period in order to quantify its impact, but that isn't a blocker to merging this now. If this does turn out to have any performance issues for certain workloads, users should be able to feature-flag it by configuring Spark with a higher spilling threshold (or we could introduce a new flag specifically to bypass this).

I'm going to merge this into master (1.4.0). Thanks!

@asfgit asfgit closed this in b5c51c8 Apr 10, 2015
@davies
Copy link
Contributor Author

davies commented Apr 10, 2015

Great thanks to test it, it did help us to find a bug!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants