-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25664][SQL][TEST] Refactor JoinBenchmark to use main method #22661
Conversation
Test build #97080 has finished for PR 22661 at commit
|
Test build #97090 has finished for PR 22661 at commit
|
val N = 20 << 20 | ||
val M = 1 << 16 | ||
|
||
val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is a removal of redundant one, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
val dim = broadcast(sparkSession.range(M).selectExpr("cast(id/10 as long) as k")) | ||
val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k")) | ||
codegenBenchmark("Join w long duplicated", N) { | ||
val dim = broadcast(spark.range(M).selectExpr("cast(id/10 as long) as k")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to another bechmark case in this file, broadcast
seems to be put outside of codegenBenchmark
. How do you think about this?
Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
Join w 2 ints wholestage off 138514 / 139178 0.2 6604.9 1.0X | ||
Join w 2 ints wholestage on 129908 / 140869 0.2 6194.5 1.1X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, is this correct? Previously, we had the followings.
*Join w 2 ints codegen=false 4426 / 4501 4.7 211.1 1.0X
*Join w 2 ints codegen=true 791 / 818 26.5 37.7 5.6X
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's correct, I ran it on master:
build/sbt "sql/test-only *benchmark.JoinBenchmark"
......
[info] JoinBenchmark:
[info] - broadcast hash join, long key !!! IGNORED !!!
[info] - broadcast hash join, long key with duplicates !!! IGNORED !!!
Running benchmark: Join w 2 ints
Running case: Join w 2 ints wholestage off
Stopped after 2 iterations, 307335 ms
Running case: Join w 2 ints wholestage on
Stopped after 5 iterations, 687107 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Join w 2 ints wholestage off 153532 / 153668 0.1 7321.0 1.0X
Join w 2 ints wholestage on 132075 / 137422 0.2 6297.8 1.2X
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, interesting. Although it's beyond the scope, could you run on branch-2.4
and branch-2.3
please, too?
def broadcastHashJoinLongKeyWithDuplicates(): Unit = { | ||
val N = 20 << 20 | ||
val M = 1 << 16 | ||
|
||
val dim = broadcast(spark.range(M).selectExpr("cast(id/10 as long) as k")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this change, we need rerun the benchmark to get a new result.
Test build #97243 has finished for PR 22661 at commit
|
Test build #97249 has finished for PR 22661 at commit
|
@@ -19,229 +19,161 @@ package org.apache.spark.sql.execution.benchmark | |||
|
|||
import org.apache.spark.sql.execution.joins._ | |||
import org.apache.spark.sql.functions._ | |||
import org.apache.spark.sql.internal.SQLConf | |||
import org.apache.spark.sql.types.IntegerType | |||
|
|||
/** | |||
* Benchmark to measure performance for aggregate primitives. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aggregate primitives
-> joins
*shuffle hash join codegen=false 2005 / 2010 2.1 478.0 1.0X | ||
*shuffle hash join codegen=true 1773 / 1792 2.4 422.7 1.1X | ||
*/ | ||
override def runBenchmarkSuite(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you wrap the followings(line 168~177) with something like runBenchmark("Join Benchmark")
?
Test build #97279 has finished for PR 22661 at commit
|
retest this please |
Test build #97287 has finished for PR 22661 at commit
|
runBenchmark("merge join", N) { | ||
val df1 = sparkSession.range(N).selectExpr(s"id * 2 as k1") | ||
val df2 = sparkSession.range(N).selectExpr(s"id * 3 as k2") | ||
codegenBenchmark("merge join", N) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge join
-> sort merge join
@wangyum . Could you review and merge wangyum#18 ? |
*------------------------------------------------------------------------------------------- | ||
*Join w 2 ints codegen=false 4426 / 4501 4.7 211.1 1.0X | ||
*Join w 2 ints codegen=true 791 / 818 26.5 37.7 5.6X | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @cloud-fan , @gatorsmile , @davies , @rxin .
We are hitting some performance slowdown in benchmark. However, this is not a regression because it's consistent in 2.0.2 ~ 2.4.0-rc3.
Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Join w 2 ints wholestage off 157742 / 158892 0.1 7521.7 1.0X
Join w 2 ints wholestage on 134290 / 152917 0.2 6403.4 1.2X
According to the original performance number, it seems to be a result when HashJoin.rewriteKeyExpr
uses a simple upcasting to bigint
. However, the current code generates a result where HashJoin.rewriteKeyExpr
uses shiftleft
operations.
scala> val df = spark.range(N).join(dim2, (col("id") % M).cast(IntegerType) === col("k1") && (col("id") % M).cast(IntegerType) === col("k2"))
scala> val df2 = spark.range(N).join(dim2, (col("id") % M) === col("k1") && (col("id") % M) === col("k2"))
scala> df.explain
== Physical Plan ==
*(2) BroadcastHashJoin [cast((id#8L % 65536) as int), cast((id#8L % 65536) as int)], [k1#2, k2#3], Inner, BuildRight
:- *(2) Range (0, 20971520, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft(cast(input[0, int, false] as bigint), 32) | (cast(input[1, int, false] as bigint) & 4294967295))))
+- *(1) Project [cast(id#0L as int) AS k1#2, cast(id#0L as int) AS k2#3, cast(id#0L as string) AS v#4]
+- *(1) Range (0, 65536, step=1, splits=8)
scala> df2.explain
== Physical Plan ==
*(2) BroadcastHashJoin [(id#23L % 65536), (id#23L % 65536)], [cast(k1#2 as bigint), cast(k2#3 as bigint)], Inner, BuildRight
:- *(2) Range (0, 20971520, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint), cast(input[1, int, false] as bigint)))
+- *(1) Project [cast(id#0L as int) AS k1#2, cast(id#0L as int) AS k2#3, cast(id#0L as string) AS v#4]
+- *(1) Range (0, 65536, step=1, splits=8)
Did we really want to measure the difference in HashJoin.rewriteKeyExpr
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any advice is welcome and thank you in advance, @cloud-fan , @gatorsmile , @davies , @rxin .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems caused by the bug fix: #15390
So the performance is reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for confirmation, @cloud-fan !
@@ -200,11 +200,12 @@ private[spark] object Benchmark { | |||
def getProcessorName(): String = { | |||
val cpu = if (SystemUtils.IS_OS_MAC_OSX) { | |||
Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n", "machdep.cpu.brand_string")) | |||
.stripLineEnd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the Mac has one more line than Linux:
28f9b9a#diff-45c96c65f7c46bc2d84843a7cb92f22fL7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur.. I'm not a fan to piggy-backing. Okay.
Test build #97299 has finished for PR 22661 at commit
|
Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
Join w 2 ints wholestage off 173174 / 173183 0.1 8257.6 1.0X | ||
Join w 2 ints wholestage on 166350 / 198362 0.1 7932.2 1.0X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this surprises me that whole stage codegen doesn't help. We should investigate it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
Test build #97301 has finished for PR 22661 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @wangyum and @cloud-fan .
Merged to master.
## What changes were proposed in this pull request? Refactor `JoinBenchmark` to use main method. 1. use `spark-submit`: ```console bin/spark-submit --class org.apache.spark.sql.execution.benchmark.JoinBenchmark --jars ./core/target/spark-core_2.11-3.0.0-SNAPSHOT-tests.jar ./sql/catalyst/target/spark-sql_2.11-3.0.0-SNAPSHOT-tests.jar ``` 2. Generate benchmark result: ```console SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.JoinBenchmark" ``` ## How was this patch tested? manual tests Closes apache#22661 from wangyum/SPARK-25664. Lead-authored-by: Yuming Wang <yumwang@ebay.com> Co-authored-by: Yuming Wang <wgyumg@gmail.com> Co-authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
Refactor
JoinBenchmark
to use main method.spark-submit
:bin/spark-submit --class org.apache.spark.sql.execution.benchmark.JoinBenchmark --jars ./core/target/spark-core_2.11-3.0.0-SNAPSHOT-tests.jar ./sql/catalyst/target/spark-sql_2.11-3.0.0-SNAPSHOT-tests.jar
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.JoinBenchmark"
How was this patch tested?
manual tests