-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4691][shuffle] Restructure a few lines in shuffle code #3553
Conversation
Can one of the admins verify this patch? |
@@ -45,7 +45,7 @@ private[spark] class HashShuffleReader[K, C]( | |||
} else { | |||
new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) | |||
} | |||
} else if (dep.aggregator.isEmpty && dep.mapSideCombine) { | |||
} else if (dep.mapSideCombine) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the previous way is much more clear and obvious from my understanding :-).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"if(dep.aggregator.isDefined) else if (dep.aggregator.isEmpty)" seems duplicate. isEmpty == !isDefined
We need to do another one more judgement for "dep.aggregator.isEmpty".
Also, in SortShuffleWriter.scala, i think "dep.aggregator.isEmpty" is better than "!dep.aggregator.isDefined".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could also write this as
if (dep.aggregator.isDefined) {
...
} else {
require(!dep.mapSideCombine, "Map-side combine requested without Aggregator specified!")
// Convert the Product2s to pairs since this is what downstream RDDs currently expect
iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this seems simple and elegant
@@ -50,7 +50,7 @@ private[spark] class SortShuffleWriter[K, V, C]( | |||
/** Write a bunch of records to this task's output */ | |||
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { | |||
if (dep.mapSideCombine) { | |||
if (!dep.aggregator.isDefined) { | |||
if (dep.aggregator.isEmpty) { | |||
throw new IllegalStateException("Aggregator is empty for map-side combine") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could change this guy to a require as well, and maybe change the message too
Ok LGTM once you fix Aaron's comment |
Done for that. |
add to whitelist. Just realized we never started tests for this |
Test build #24125 has started for PR 3553 at commit
|
Test build #24125 has finished for PR 3553 at commit
|
Test PASSed. |
Can we get a better title for this issue? |
Done as new title |
@pwendell any idea about this title?/ |
I would use |
Should probably just merge it into master, and thus it's independent of the 1.2 release, right? |
NP, done for title change, priority is defined in jira |
@aarondav I wanted to back port this into branch-1.2 as well. It would be good to minimize the divergence between master and 1.2 if possible. I'm merging this into master now and I'll mark it |
I've merged this into |
In HashShuffleReader.scala and HashShuffleWriter.scala, no need to judge "dep.aggregator.isEmpty" again as this is judged by "dep.aggregator.isDefined" In SortShuffleWriter.scala, "dep.aggregator.isEmpty" is better than "!dep.aggregator.isDefined" ? Author: maji2014 <maji3@asiainfo.com> Closes #3553 from maji2014/spark-4691 and squashes the following commits: bf7b14d [maji2014] change a elegant way for SortShuffleWriter.scala 10d0cf0 [maji2014] change a elegant way d8f52dc [maji2014] code optimization for judgement (cherry picked from commit b310744) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
In HashShuffleReader.scala and HashShuffleWriter.scala, no need to judge "dep.aggregator.isEmpty" again as this is judged by "dep.aggregator.isDefined"
In SortShuffleWriter.scala, "dep.aggregator.isEmpty" is better than "!dep.aggregator.isDefined" ?