-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-1357] [MLLIB] Annotate developer and experimental APIs #298
Conversation
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13844/ |
@manishamde Could you take a look at the annotations for decision tree? I marked all classes that users do not need as package private and a few interfaces developer/experimental. It would be easier to check the changes if you merge the changes, run |
@mengxr Sure. I will take a look at the annotations tonight. |
LGTM! A couple of minor suggestions:
|
Merged build triggered. |
@manishamde I overlooked |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13887/ |
add default parameters to JavaDoc
Merged build triggered. |
Merged build started. |
Merged build triggered. |
Merged build started. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13944/ |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13943/ |
Jenkins, retest this please. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Thanks! Merged. |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Annotate developer and experimental APIs in MLlib. Author: Xiangrui Meng <meng@databricks.com> Closes apache#298 from mengxr/api and squashes the following commits: 13390e8 [Xiangrui Meng] Merge branch 'master' into api dc4cbb3 [Xiangrui Meng] mark distribute matrices experimental 6b9f8e2 [Xiangrui Meng] add Experimental annotation 8773d0d [Xiangrui Meng] add DeveloperApi annotation da31733 [Xiangrui Meng] update developer and experimental tags 555e0fe [Xiangrui Meng] Merge branch 'master' into api ef1a717 [Xiangrui Meng] mark some constructors private add default parameters to JavaDoc 00ffbcc [Xiangrui Meng] update tree API annotation 0b674fa [Xiangrui Meng] mark decision tree APIs 86b9e34 [Xiangrui Meng] one pass over APIs of GLMs, NaiveBayes, and ALS f21d862 [Xiangrui Meng] Merge branch 'master' into api 2b133d6 [Xiangrui Meng] intial annotation of developer and experimental apis
…is reused ## What changes were proposed in this pull request? With this change, we can easily identify the plan difference when subquery is reused. When the reuse is enabled, the plan looks like ``` == Physical Plan == CollectLimit 1 +- *(1) Project [(Subquery subquery240 + ReusedSubquery Subquery subquery240) AS (scalarsubquery() + scalarsubquery())#253] : :- Subquery subquery240 : : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#250]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#256, count#257L]) : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- ReusedSubquery Subquery subquery240 +- *(1) SerializeFromObject +- Scan[obj#12] ``` When the reuse is disabled, the plan looks like ``` == Physical Plan == CollectLimit 1 +- *(1) Project [(Subquery subquery286 + Subquery subquery287) AS (scalarsubquery() + scalarsubquery())#299] : :- Subquery subquery286 : : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#296]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#302, count#303L]) : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- Subquery subquery287 : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#298]) : +- Exchange SinglePartition : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#306, count#307L]) : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : +- Scan[obj#12] +- *(1) SerializeFromObject +- Scan[obj#12] ``` ## How was this patch tested? Modified the existing test. Closes #24258 from gatorsmile/followupSPARK-27279. Authored-by: gatorsmile <gatorsmile@gmail.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…ice-broker Add support for running osb-checker tests against huaweicloud
…is reused With this change, we can easily identify the plan difference when subquery is reused. When the reuse is enabled, the plan looks like ``` == Physical Plan == CollectLimit 1 +- *(1) Project [(Subquery subquery240 + ReusedSubquery Subquery subquery240) AS (scalarsubquery() + scalarsubquery())apache#253] : :- Subquery subquery240 : : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)apache#250]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#256, count#257L]) : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- ReusedSubquery Subquery subquery240 +- *(1) SerializeFromObject +- Scan[obj#12] ``` When the reuse is disabled, the plan looks like ``` == Physical Plan == CollectLimit 1 +- *(1) Project [(Subquery subquery286 + Subquery subquery287) AS (scalarsubquery() + scalarsubquery())apache#299] : :- Subquery subquery286 : : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)apache#296]) : : +- Exchange SinglePartition : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#302, count#303L]) : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- Subquery subquery287 : +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)apache#298]) : +- Exchange SinglePartition : +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#306, count#307L]) : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : +- Scan[obj#12] +- *(1) SerializeFromObject +- Scan[obj#12] ``` Modified the existing test. Closes apache#24258 from gatorsmile/followupSPARK-27279. Authored-by: gatorsmile <gatorsmile@gmail.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
* [MSPARK-31632] RM UI showing broken page for Spark jobs
…subquery reuse ### What changes were proposed in this pull request? This PR: 1. Fixes an issue in `ReuseExchange` rule that can result a `ReusedExchange` node pointing to an invalid exchange. This can happen due to the 2 separate traversals in `ReuseExchange` when the 2nd traversal modifies an exchange that has already been referenced (reused) in the 1st traversal. Consider the following query: ``` WITH t AS ( SELECT df1.id, df2.k FROM df1 JOIN df2 ON df1.k = df2.k WHERE df2.id < 2 ) SELECT * FROM t AS a JOIN t AS b ON a.id = b.id ``` Before this PR the plan of the query was (note the `<== this reuse node points to a non-existing node` marker): ``` == Physical Plan == *(7) SortMergeJoin [id#14L], [id#18L], Inner :- *(3) Sort [id#14L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#14L, 5), true, [id=#298] : +- *(2) Project [id#14L, k#17L] : +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight : :- *(2) Project [id#14L, k#15L] : : +- *(2) Filter isnotnull(id#14L) : : +- *(2) ColumnarToRow : : +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> : : +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#289] : : +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179] : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179] : +- *(1) Project [k#17L] : +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L)) : +- *(1) ColumnarToRow : +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint> +- *(6) Sort [id#18L ASC NULLS FIRST], false, 0 +- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#184] <== this reuse node points to a non-existing node ``` After this PR: ``` == Physical Plan == *(7) SortMergeJoin [id#14L], [id#18L], Inner :- *(3) Sort [id#14L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#14L, 5), true, [id=#231] : +- *(2) Project [id#14L, k#17L] : +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight : :- *(2) Project [id#14L, k#15L] : : +- *(2) Filter isnotnull(id#14L) : : +- *(2) ColumnarToRow : : +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> : : +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#103] : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102] : : +- *(1) Project [k#17L] : : +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L)) : : +- *(1) ColumnarToRow : : +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint> : +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102] +- *(6) Sort [id#18L ASC NULLS FIRST], false, 0 +- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#231] ``` 2. Fixes an issue with separate consecutive `ReuseExchange` and `ReuseSubquery` rules that can result a `ReusedExchange` node pointing to an invalid exchange. This can happen due to the 2 separate rules when `ReuseSubquery` rule modifies an exchange that has already been referenced (reused) in `ReuseExchange` rule. Consider the following query: ``` WITH t AS ( SELECT df1.id, df2.k FROM df1 JOIN df2 ON df1.k = df2.k WHERE df2.id < 2 ), t2 AS ( SELECT * FROM t UNION SELECT * FROM t ) SELECT * FROM t2 AS a JOIN t2 AS b ON a.id = b.id ``` Before this PR the plan of the query was (note the `<== this reuse node points to a non-existing node` marker): ``` == Physical Plan == *(15) SortMergeJoin [id#46L], [id#58L], Inner :- *(7) Sort [id#46L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#46L, 5), true, [id=#979] : +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[]) : +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#975] : +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[]) : +- Union : :- *(2) Project [id#46L, k#49L] : : +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight : : :- *(2) Project [id#46L, k#47L] : : : +- *(2) Filter isnotnull(id#46L) : : : +- *(2) ColumnarToRow : : : +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> : : : +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926] : : : +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656] : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656] : : +- *(1) Project [k#49L] : : +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L)) : : +- *(1) ColumnarToRow : : +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint> : +- *(4) Project [id#46L, k#49L] : +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight : :- *(4) Project [id#46L, k#47L] : : +- *(4) Filter isnotnull(id#46L) : : +- *(4) ColumnarToRow : : +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926] : +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656] +- *(14) Sort [id#58L ASC NULLS FIRST], false, 0 +- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#761] <== this reuse node points to a non-existing node ``` After this PR: ``` == Physical Plan == *(15) SortMergeJoin [id#46L], [id#58L], Inner :- *(7) Sort [id#46L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#46L, 5), true, [id=#793] : +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[]) : +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#789] : +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[]) : +- Union : :- *(2) Project [id#46L, k#49L] : : +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight : : :- *(2) Project [id#46L, k#47L] : : : +- *(2) Filter isnotnull(id#46L) : : : +- *(2) ColumnarToRow : : : +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> : : : +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485] : : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484] : : : +- *(1) Project [k#49L] : : : +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L)) : : : +- *(1) ColumnarToRow : : : +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint> : : +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484] : +- *(4) Project [id#46L, k#49L] : +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight : :- *(4) Project [id#46L, k#47L] : : +- *(4) Filter isnotnull(id#46L) : : +- *(4) ColumnarToRow : : +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485] : +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484] +- *(14) Sort [id#58L ASC NULLS FIRST], false, 0 +- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#793] ``` (This example contains issue 1 as well.) 3. Improves the reuse of exchanges and subqueries by enabling reuse across the whole plan. This means that the new combined rule utilizes the reuse opportunities between parent and subqueries by traversing the whole plan. The traversal is started on the top level query only. 4. Due to the order of traversal this PR does while adding reuse nodes, the reuse nodes appear in parent queries if reuse is possible between different levels of queries (typical for DPP). This is not an issue from execution perspective, but this also means "forward references" in explain formatted output where parent queries come first. The changes I made to `ExplainUtils` are to handle these references properly. This PR fixes the above 3 issues by unifying the separate rules into a `ReuseExchangeAndSubquery` rule that does a 1 pass, whole-plan, bottom-up traversal. ### Why are the changes needed? Performance improvement. ### How was this patch tested? - New UTs in `ReuseExchangeAndSubquerySuite` to cover 1. and 2. - New UTs in `DynamicPartitionPruningSuite`, `SubquerySuite` and `ExchangeSuite` to cover 3. - New `ReuseMapSuite` to test `ReuseMap`. - Checked new golden files of `PlanStabilitySuite`s for invalid reuse references. - TPCDS benchmarks. Closes #28885 from peter-toth/SPARK-29375-SPARK-28940-whole-plan-reuse. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Annotate developer and experimental APIs in MLlib.