-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2094][SQL] "Exactly once" semantics for DDL and command statements #1071
Conversation
Merged build triggered. |
Merged build started. |
// Set value for key k; the action itself would | ||
// have been performed in QueryExecution eagerly. | ||
case (Some(k), Some(v)) => context.emptyResult | ||
case class SetCommandPhysical( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general we have not been naming our physical operators differently, but have been relying on the package to differentiate as this is less redundant. (i.e. logical.Project
vs. execution.Project
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, just followed @concretevitamin's naming style, would love to rename all these physical commands.
This is looking really good! :) Can you make sure we have tests in hive for |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15717/ |
// Set value for key k. | ||
case (Some(k), Some(v)) => | ||
context.set(k, v) | ||
Array.empty[(String, String)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we decided to not echo the newly set key-val pair?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just followed the original logic here, but I do agree that echo the pair would be more user friendly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... Just confirmed that Hive 0.12 doesn't return anything in this case (even an "OK" string), so I'd prefer to left it as is to mimic Hive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are already differing from Hive in the behavior of "SET". I don't see a reason to stick to hive semantics in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, made the command returns the newly set pair.
Merged build triggered. |
Merged build started. |
SparkLogicalPlan(queryExecution.executedPlan) | ||
case _ => | ||
baseLogicalPlan | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Realized that many SchemaRDD
actions other than collect()
and DSL methods reuses logicalPlan
and breaks the "exactly once" constraints when planning the local plan (new physical plan node for DDL/command statements are created, causing the side effect taking place again).
So I replaced logicalPlan
with the executed physical plan wrapped with a SparkLogicalPlan
to prevent multiple physical plan instantiations for the same DDL/command statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. This is probably the problem I was seeing with double "UNCACHE TABLE".
Merged build triggered. |
Merged build started. |
Added the test cases and resolved all the issues brought up in the comments. Thanks for carefully reviewing this! @marmbrus @concretevitamin |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
This is awesome! Thanks :) I merged this into master. Merging into 1.0 failed, but I'll just make sure it gets included as part of: #1078 |
#511 and #863 got left out of branch-1.0 since we were really close to the release. Now that they have been tested a little I see no reason to leave them out. Author: Michael Armbrust <michael@databricks.com> Author: witgo <witgo@qq.com> Closes #1078 from marmbrus/branch-1.0 and squashes the following commits: 22be674 [witgo] [SPARK-1841]: update scalatest to version 2.1.5 fc8fc79 [Michael Armbrust] Include #1071 as well. c5d0adf [Michael Armbrust] Update SparkSQL in branch-1.0 to match master.
Updated `JavaSQLContext` and `JavaHiveContext` similar to what we've done to `SQLContext` and `HiveContext` in PR #1071. Added corresponding test case for Spark SQL Java API. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1085 from liancheng/spark-2094-java and squashes the following commits: 29b8a51 [Cheng Lian] Avoided instantiating JavaSparkContext & JavaHiveContext to workaround test failure 92bb4fb [Cheng Lian] Marked test cases in JavaHiveQLSuite with "ignore" 22aec97 [Cheng Lian] Follow up of PR #1071 for Java API
Updated `JavaSQLContext` and `JavaHiveContext` similar to what we've done to `SQLContext` and `HiveContext` in PR #1071. Added corresponding test case for Spark SQL Java API. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1085 from liancheng/spark-2094-java and squashes the following commits: 29b8a51 [Cheng Lian] Avoided instantiating JavaSparkContext & JavaHiveContext to workaround test failure 92bb4fb [Cheng Lian] Marked test cases in JavaHiveQLSuite with "ignore" 22aec97 [Cheng Lian] Follow up of PR #1071 for Java API (cherry picked from commit 273afcb) Signed-off-by: Reynold Xin <rxin@apache.org>
…ents ## Related JIRA issues - Main issue: - [SPARK-2094](https://issues.apache.org/jira/browse/SPARK-2094): Ensure exactly once semantics for DDL/Commands - Issues resolved as dependencies: - [SPARK-2081](https://issues.apache.org/jira/browse/SPARK-2081): Undefine output() from the abstract class Command and implement it in concrete subclasses - [SPARK-2128](https://issues.apache.org/jira/browse/SPARK-2128): No plan for DESCRIBE - [SPARK-1852](https://issues.apache.org/jira/browse/SPARK-1852): SparkSQL Queries with Sorts run before the user asks them to - Other related issue: - [SPARK-2129](https://issues.apache.org/jira/browse/SPARK-2129): NPE thrown while lookup a view Two test cases, `join_view` and `mergejoin_mixed`, within the `HiveCompatibilitySuite` are removed from the whitelist to workaround this issue. ## PR Overview This PR defines physical plans for DDL statements and commands and wraps their side effects in a lazy field `PhysicalCommand.sideEffectResult`, so that they are executed eagerly and exactly once. Also, as a positive side effect, now DDL statements and commands can be turned into proper `SchemaRDD`s and let user query the execution results. This PR defines schemas for the following DDL/commands: - EXPLAIN command - `plan`: String, the plan explanation - SET command - `key`: String, the key(s) of the propert(y/ies) being set or queried - `value`: String, the value(s) of the propert(y/ies) being queried - Other Hive native command - `result`: String, execution result returned by Hive **NOTE**: We should refine schemas for different native commands by defining physical plans for them in the future. ## Examples ### EXPLAIN command Take the "EXPLAIN" command as an example, we first execute the command and obtain a `SchemaRDD` at the same time, then query the `plan` field with the schema DSL: ``` scala> loadTestTable("src") ... scala> val q0 = hql("EXPLAIN SELECT key, COUNT(*) FROM src GROUP BY key") ... q0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == ExplainCommandPhysical [plan#11:0] Aggregate false, [key#4], [key#4,SUM(PartialCount#6L) AS c_1#2L] Exchange (HashPartitioning [key#4:0], 200) Exchange (HashPartitioning [key#4:0], 200) Aggregate true, [key#4], [key#4,COUNT(1) AS PartialCount#6L] HiveTableScan [key#4], (MetastoreRelation default, src, None), None scala> q0.select('plan).collect() ... [ExplainCommandPhysical [plan#24:0] Aggregate false, [key#17], [key#17,SUM(PartialCount#19L) AS c_1#2L] Exchange (HashPartitioning [key#17:0], 200) Exchange (HashPartitioning [key#17:0], 200) Aggregate true, [key#17], [key#17,COUNT(1) AS PartialCount#19L] HiveTableScan [key#17], (MetastoreRelation default, src, None), None] scala> ``` ### SET command In this example we query all the properties set in `SQLConf`, register the result as a table, and then query the table with HiveQL: ``` scala> val q1 = hql("SET") ... q1: org.apache.spark.sql.SchemaRDD = SchemaRDD[7] at RDD at SchemaRDD.scala:98 == Query Plan == <SET command: executed by Hive, and noted by SQLContext> scala> q1.registerAsTable("properties") scala> hql("SELECT key, value FROM properties ORDER BY key LIMIT 10").foreach(println) ... == Query Plan == TakeOrdered 10, [key#51:0 ASC] Project [key#51:0,value#52:1] SetCommandPhysical None, None, [key#55:0,value#56:1]), which has no missing parents 14/06/12 12:19:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 5 (SchemaRDD[21] at RDD at SchemaRDD.scala:98 == Query Plan == TakeOrdered 10, [key#51:0 ASC] Project [key#51:0,value#52:1] SetCommandPhysical None, None, [key#55:0,value#56:1]) ... [datanucleus.autoCreateSchema,true] [datanucleus.autoStartMechanismMode,checked] [datanucleus.cache.level2,false] [datanucleus.cache.level2.type,none] [datanucleus.connectionPoolingType,BONECP] [datanucleus.fixedDatastore,false] [datanucleus.identifierFactory,datanucleus1] [datanucleus.plugin.pluginRegistryBundleCheck,LOG] [datanucleus.rdbms.useLegacyNativeValueStrategy,true] [datanucleus.storeManagerType,rdbms] scala> ``` ### "Exactly once" semantics At last, an example of the "exactly once" semantics: ``` scala> val q2 = hql("CREATE TABLE t1(key INT, value STRING)") ... q2: org.apache.spark.sql.SchemaRDD = SchemaRDD[28] at RDD at SchemaRDD.scala:98 == Query Plan == <Native command: executed by Hive> scala> table("t1") ... res9: org.apache.spark.sql.SchemaRDD = SchemaRDD[32] at RDD at SchemaRDD.scala:98 == Query Plan == HiveTableScan [key#58,value#59], (MetastoreRelation default, t1, None), None scala> q2.collect() ... res10: Array[org.apache.spark.sql.Row] = Array([]) scala> ``` As we can see, the "CREATE TABLE" command is executed eagerly right after the `SchemaRDD` is created, and referencing the `SchemaRDD` again won't trigger a duplicated execution. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes apache#1071 from liancheng/exactlyOnceCommand and squashes the following commits: d005b03 [Cheng Lian] Made "SET key=value" returns the newly set key value pair f6c7715 [Cheng Lian] Added test cases for DDL/command statement RDDs 1d00937 [Cheng Lian] Makes SchemaRDD DSLs work for DDL/command statement RDDs 5c7e680 [Cheng Lian] Bug fix: wrong type used in pattern matching 48aa2e5 [Cheng Lian] Refined SQLContext.emptyResult as an empty RDD[Row] cc64f32 [Cheng Lian] Renamed physical plan classes for DDL/commands 74789c1 [Cheng Lian] Fixed failing test cases 0ad343a [Cheng Lian] Added physical plan for DDL and commands to ensure the "exactly once" semantics
Updated `JavaSQLContext` and `JavaHiveContext` similar to what we've done to `SQLContext` and `HiveContext` in PR apache#1071. Added corresponding test case for Spark SQL Java API. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes apache#1085 from liancheng/spark-2094-java and squashes the following commits: 29b8a51 [Cheng Lian] Avoided instantiating JavaSparkContext & JavaHiveContext to workaround test failure 92bb4fb [Cheng Lian] Marked test cases in JavaHiveQLSuite with "ignore" 22aec97 [Cheng Lian] Follow up of PR apache#1071 for Java API
…ents ## Related JIRA issues - Main issue: - [SPARK-2094](https://issues.apache.org/jira/browse/SPARK-2094): Ensure exactly once semantics for DDL/Commands - Issues resolved as dependencies: - [SPARK-2081](https://issues.apache.org/jira/browse/SPARK-2081): Undefine output() from the abstract class Command and implement it in concrete subclasses - [SPARK-2128](https://issues.apache.org/jira/browse/SPARK-2128): No plan for DESCRIBE - [SPARK-1852](https://issues.apache.org/jira/browse/SPARK-1852): SparkSQL Queries with Sorts run before the user asks them to - Other related issue: - [SPARK-2129](https://issues.apache.org/jira/browse/SPARK-2129): NPE thrown while lookup a view Two test cases, `join_view` and `mergejoin_mixed`, within the `HiveCompatibilitySuite` are removed from the whitelist to workaround this issue. ## PR Overview This PR defines physical plans for DDL statements and commands and wraps their side effects in a lazy field `PhysicalCommand.sideEffectResult`, so that they are executed eagerly and exactly once. Also, as a positive side effect, now DDL statements and commands can be turned into proper `SchemaRDD`s and let user query the execution results. This PR defines schemas for the following DDL/commands: - EXPLAIN command - `plan`: String, the plan explanation - SET command - `key`: String, the key(s) of the propert(y/ies) being set or queried - `value`: String, the value(s) of the propert(y/ies) being queried - Other Hive native command - `result`: String, execution result returned by Hive **NOTE**: We should refine schemas for different native commands by defining physical plans for them in the future. ## Examples ### EXPLAIN command Take the "EXPLAIN" command as an example, we first execute the command and obtain a `SchemaRDD` at the same time, then query the `plan` field with the schema DSL: ``` scala> loadTestTable("src") ... scala> val q0 = hql("EXPLAIN SELECT key, COUNT(*) FROM src GROUP BY key") ... q0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == ExplainCommandPhysical [plan#11:0] Aggregate false, [key#4], [key#4,SUM(PartialCount#6L) AS c_1#2L] Exchange (HashPartitioning [key#4:0], 200) Exchange (HashPartitioning [key#4:0], 200) Aggregate true, [key#4], [key#4,COUNT(1) AS PartialCount#6L] HiveTableScan [key#4], (MetastoreRelation default, src, None), None scala> q0.select('plan).collect() ... [ExplainCommandPhysical [plan#24:0] Aggregate false, [key#17], [key#17,SUM(PartialCount#19L) AS c_1#2L] Exchange (HashPartitioning [key#17:0], 200) Exchange (HashPartitioning [key#17:0], 200) Aggregate true, [key#17], [key#17,COUNT(1) AS PartialCount#19L] HiveTableScan [key#17], (MetastoreRelation default, src, None), None] scala> ``` ### SET command In this example we query all the properties set in `SQLConf`, register the result as a table, and then query the table with HiveQL: ``` scala> val q1 = hql("SET") ... q1: org.apache.spark.sql.SchemaRDD = SchemaRDD[7] at RDD at SchemaRDD.scala:98 == Query Plan == <SET command: executed by Hive, and noted by SQLContext> scala> q1.registerAsTable("properties") scala> hql("SELECT key, value FROM properties ORDER BY key LIMIT 10").foreach(println) ... == Query Plan == TakeOrdered 10, [key#51:0 ASC] Project [key#51:0,value#52:1] SetCommandPhysical None, None, [key#55:0,value#56:1]), which has no missing parents 14/06/12 12:19:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 5 (SchemaRDD[21] at RDD at SchemaRDD.scala:98 == Query Plan == TakeOrdered 10, [key#51:0 ASC] Project [key#51:0,value#52:1] SetCommandPhysical None, None, [key#55:0,value#56:1]) ... [datanucleus.autoCreateSchema,true] [datanucleus.autoStartMechanismMode,checked] [datanucleus.cache.level2,false] [datanucleus.cache.level2.type,none] [datanucleus.connectionPoolingType,BONECP] [datanucleus.fixedDatastore,false] [datanucleus.identifierFactory,datanucleus1] [datanucleus.plugin.pluginRegistryBundleCheck,LOG] [datanucleus.rdbms.useLegacyNativeValueStrategy,true] [datanucleus.storeManagerType,rdbms] scala> ``` ### "Exactly once" semantics At last, an example of the "exactly once" semantics: ``` scala> val q2 = hql("CREATE TABLE t1(key INT, value STRING)") ... q2: org.apache.spark.sql.SchemaRDD = SchemaRDD[28] at RDD at SchemaRDD.scala:98 == Query Plan == <Native command: executed by Hive> scala> table("t1") ... res9: org.apache.spark.sql.SchemaRDD = SchemaRDD[32] at RDD at SchemaRDD.scala:98 == Query Plan == HiveTableScan [key#58,value#59], (MetastoreRelation default, t1, None), None scala> q2.collect() ... res10: Array[org.apache.spark.sql.Row] = Array([]) scala> ``` As we can see, the "CREATE TABLE" command is executed eagerly right after the `SchemaRDD` is created, and referencing the `SchemaRDD` again won't trigger a duplicated execution. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes apache#1071 from liancheng/exactlyOnceCommand and squashes the following commits: d005b03 [Cheng Lian] Made "SET key=value" returns the newly set key value pair f6c7715 [Cheng Lian] Added test cases for DDL/command statement RDDs 1d00937 [Cheng Lian] Makes SchemaRDD DSLs work for DDL/command statement RDDs 5c7e680 [Cheng Lian] Bug fix: wrong type used in pattern matching 48aa2e5 [Cheng Lian] Refined SQLContext.emptyResult as an empty RDD[Row] cc64f32 [Cheng Lian] Renamed physical plan classes for DDL/commands 74789c1 [Cheng Lian] Fixed failing test cases 0ad343a [Cheng Lian] Added physical plan for DDL and commands to ensure the "exactly once" semantics
Updated `JavaSQLContext` and `JavaHiveContext` similar to what we've done to `SQLContext` and `HiveContext` in PR apache#1071. Added corresponding test case for Spark SQL Java API. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes apache#1085 from liancheng/spark-2094-java and squashes the following commits: 29b8a51 [Cheng Lian] Avoided instantiating JavaSparkContext & JavaHiveContext to workaround test failure 92bb4fb [Cheng Lian] Marked test cases in JavaHiveQLSuite with "ignore" 22aec97 [Cheng Lian] Follow up of PR apache#1071 for Java API
Related JIRA issues
SPARK-2129: NPE thrown while lookup a view
Two test cases,
join_view
andmergejoin_mixed
, within theHiveCompatibilitySuite
are removed from the whitelist to workaround this issue.PR Overview
This PR defines physical plans for DDL statements and commands and wraps their side effects in a lazy field
PhysicalCommand.sideEffectResult
, so that they are executed eagerly and exactly once. Also, as a positive side effect, now DDL statements and commands can be turned into properSchemaRDD
s and let user query the execution results.This PR defines schemas for the following DDL/commands:
EXPLAIN command
plan
: String, the plan explanationSET command
key
: String, the key(s) of the propert(y/ies) being set or queriedvalue
: String, the value(s) of the propert(y/ies) being queriedOther Hive native command
result
: String, execution result returned by HiveNOTE: We should refine schemas for different native commands by defining physical plans for them in the future.
Examples
EXPLAIN command
Take the "EXPLAIN" command as an example, we first execute the command and obtain a
SchemaRDD
at the same time, then query theplan
field with the schema DSL:SET command
In this example we query all the properties set in
SQLConf
, register the result as a table, and then query the table with HiveQL:"Exactly once" semantics
At last, an example of the "exactly once" semantics:
As we can see, the "CREATE TABLE" command is executed eagerly right after the
SchemaRDD
is created, and referencing theSchemaRDD
again won't trigger a duplicated execution.