Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29248][SQL] Add PhysicalWriteInfo with number of partitions #25990

Closed
wants to merge 46 commits into from

Conversation

edrevo
Copy link
Contributor

@edrevo edrevo commented Oct 1, 2019

What changes were proposed in this pull request?

When implementing a ScanBuilder, we require the implementor to provide the schema of the data and the number of partitions.

However, when someone is implementing WriteBuilder we only pass them the schema, but not the number of partitions. This is an asymetrical developer experience.

This PR adds a PhysicalWriteInfo interface that is passed to createBatchWriterFactory and createStreamingWriterFactory that adds the number of partitions of the data that is going to be written.

Why are the changes needed?

Passing in the number of partitions on the WriteBuilder would enable data sources to provision their write targets before starting to write. For example:

  • it could be used to provision a Kafka topic with a specific number of partitions
  • it could be used to scale a microservice prior to sending the data to it
  • it could be used to create a DsV2 that sends the data to another spark cluster (currently not possible since the reader wouldn't be able to know the number of partitions)

Does this PR introduce any user-facing change?

No

How was this patch tested?

Not tested yet

@edrevo
Copy link
Contributor Author

edrevo commented Oct 1, 2019

@rdblue @cloud-fan , I'm opening this PR in response to #25945 (comment)

I haven't been able to implement the change successfully, since I am not familiar with the streaming part of Spark and I am not sure where to get the number of partitions in a streaming execution context. If you can provide me some clues as to where I can find the number of partitions, I'm happy to continue with the code changes.

@edrevo edrevo changed the title [SPARK-29248][SQL] Pass in number of partitions to WriteBuilder [SPARK-29248][SQL][WIP] Pass in number of partitions to WriteBuilder Oct 1, 2019
wangyum and others added 2 commits November 15, 2019 15:49
…tions

### What changes were proposed in this pull request?

In order to avoid frequently changing the value of `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions`, we usually set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` much larger than `spark.sql.shuffle.partitions` after enabling adaptive execution, which causes some bucket map join lose efficacy and add more `ShuffleExchange`.

How to reproduce:
```scala
val bucketedTableName = "bucketed_table"
spark.range(10000).write.bucketBy(500, "id").sortBy("id").mode(org.apache.spark.sql.SaveMode.Overwrite).saveAsTable(bucketedTableName)
val bucketedTable = spark.table(bucketedTableName)
val df = spark.range(8)

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
// Spark 2.4. spark.sql.adaptive.enabled=false
// We set spark.sql.shuffle.partitions <= 500 every time based on our data in this case.
spark.conf.set("spark.sql.shuffle.partitions", 500)
bucketedTable.join(df, "id").explain()
// Since 3.0. We enabled adaptive execution and set spark.sql.adaptive.shuffle.maxNumPostShufflePartitions to a larger values to fit more cases.
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 1000)
bucketedTable.join(df, "id").explain()
```

```
scala> bucketedTable.join(df, "id").explain()
== Physical Plan ==
*(4) Project [id#5L]
+- *(4) SortMergeJoin [id#5L], [id#7L], Inner
   :- *(1) Sort [id#5L ASC NULLS FIRST], false, 0
   :  +- *(1) Project [id#5L]
   :     +- *(1) Filter isnotnull(id#5L)
   :        +- *(1) ColumnarToRow
   :           +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500
   +- *(3) Sort [id#7L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#7L, 500), true, [id=apache#49]
         +- *(2) Range (0, 8, step=1, splits=16)
```
vs
```
scala> bucketedTable.join(df, "id").explain()
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Project [id#5L]
   +- SortMergeJoin [id#5L], [id#7L], Inner
      :- Sort [id#5L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#5L, 1000), true, [id=apache#93]
      :     +- Project [id#5L]
      :        +- Filter isnotnull(id#5L)
      :           +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500
      +- Sort [id#7L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#7L, 1000), true, [id=apache#92]
            +- Range (0, 8, step=1, splits=16)
```

This PR makes read bucketed tables always obeys `spark.sql.shuffle.partitions` even enabling adaptive execution and set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` to avoid add more `ShuffleExchange`.

### Why are the changes needed?
Do not degrade performance after enabling adaptive execution.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Unit test.

Closes apache#26409 from wangyum/SPARK-29655.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@edrevo edrevo changed the title [SPARK-29248][SQL][WIP] Pass in number of partitions to WriteBuilder [SPARK-29248][SQL] Pass in number of partitions to WriteBuilder Nov 15, 2019
@edrevo
Copy link
Contributor Author

edrevo commented Nov 15, 2019

@cloud-fan, I have addressed your feedback

@edrevo
Copy link
Contributor Author

edrevo commented Nov 15, 2019

Build failure seems unrelated to my changes

@cloud-fan
Copy link
Contributor

ok to test

…CalendarInterval

### What changes were proposed in this pull request?

We now have two different implementation for multi-units interval strings to CalendarInterval type values.

One is used to covert interval string literals to CalendarInterval. This approach will re-delegate the interval string to spark parser which handles the string as a `singleInterval` -> `multiUnitsInterval` -> eventually call `IntervalUtils.fromUnitStrings`

The other is used in `Cast`, which eventually calls `IntervalUtils.stringToInterval`. This approach is ~10 times faster than the other.

We should unify these two for better performance and simple logic. this pr uses the 2nd approach.

### Why are the changes needed?

We should unify these two for better performance and simple logic.

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

we shall not fail on existing uts

Closes apache#26491 from yaooqinn/SPARK-29870.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@SparkQA
Copy link

SparkQA commented Nov 18, 2019

Test build #113992 has finished for PR 25990 at commit 9ff8ac7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 18, 2019

Test build #114001 has finished for PR 25990 at commit 9ff8ac7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

…og like v2 commands

### What changes were proposed in this pull request?
Add AlterNamespaceSetLocationStatement, AlterNamespaceSetLocation, AlterNamespaceSetLocationExec to make ALTER DATABASE (SET LOCATION) look up catalog like v2 commands.
And also refine the code of AlterNamespaceSetProperties, AlterNamespaceSetPropertiesExec, DescribeNamespace, DescribeNamespaceExec to use SupportsNamespaces instead of CatalogPlugin for catalog parameter.

### Why are the changes needed?
It's important to make all the commands have the same catalog/namespace resolution behavior, to avoid confusing end-users.

### Does this PR introduce any user-facing change?
Yes, add "ALTER NAMESPACE ... SET LOCATION" whose function is same as "ALTER DATABASE ... SET LOCATION" and "ALTER SCHEMA ... SET LOCATION".

### How was this patch tested?
New unit tests

Closes apache#26562 from fuwhu/SPARK-29859.

Authored-by: fuwhu <bestwwg@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

@edrevo can you update the PR title and description?

@edrevo edrevo changed the title [SPARK-29248][SQL] Pass in number of partitions to WriteBuilder [SPARK-29248][SQL] Add PhysicalWriteInfo with number of partitions Nov 18, 2019
### What changes were proposed in this pull request?

Checked with SQL Standard and PostgreSQL

> CHAR is equivalent to CHARACTER. DEC is equivalent to DECIMAL. INT is equivalent to INTEGER. VARCHAR is equivalent to CHARACTER VARYING. ...

```sql
postgres=# select dec '1.0';
numeric
---------
1.0
(1 row)

postgres=# select CHARACTER '. second';
  bpchar
----------
 . second
(1 row)

postgres=# select CHAR '. second';
  bpchar
----------
 . second
(1 row)
```

### Why are the changes needed?

For better ansi support
### Does this PR introduce any user-facing change?

yes, we add character as char and dec as decimal

### How was this patch tested?

add ut

Closes apache#26574 from yaooqinn/SPARK-29941.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…hen regen golden file with --SET --import both specified

### What changes were proposed in this pull request?

When regenerating golden files, the set operations via `--SET` will not be done, but those with --import should be exceptions because we need the set command.

### Why are the changes needed?

fix test tool.
### Does this PR introduce any user-facing change?

### How was this patch tested?

add ut, but I'm not sure we need these tests for tests itself.
cc maropu cloud-fan

Closes apache#26557 from yaooqinn/SPARK-29873.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
* {@link WriteBuilder}.
*/
@Evolving
public interface LogicalWriteInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are write options not part of LogicalWriteInfo?

@@ -27,7 +27,6 @@ import scala.util.{Failure, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this looks like an unrelated change that causes this to touch an extra file. Can you remove it?

@rdblue
Copy link
Contributor

rdblue commented Nov 18, 2019

What is the benefit of adding LogicalWriteInfo? As far as I can tell, this just changes how schema and query ID are passed to the logical write builder. That doesn't seem very useful to me and this would be a much smaller change without it.

I'd prefer to separate the change to use LogicalWriteInfo into its own PR and focus this one on getting the number of partitions to the stream writer.

@cloud-fan
Copy link
Contributor

This PR starts to add LogicalWriteInfo but then finds out that it can't hold number of partitions. Now LogicalWriteInfo is not necessary and agree with @rdblue that it's better to have a separated PR.

falaki and others added 7 commits November 19, 2019 09:04
…ction required by user function

### What changes were proposed in this pull request?
The implementation for walking through the user function AST and picking referenced variables and functions, had an optimization to skip a branch if it had already seen it. This runs into an interesting problem in the following example

```
df <- createDataFrame(data.frame(x=1))
f1 <- function(x) x + 1
f2 <- function(x) f1(x) + 2
dapplyCollect(df, function(x) { f1(x); f2(x) })
```
Results in error:
```
org.apache.spark.SparkException: R computation failed with
 Error in f1(x) : could not find function "f1"
Calls: compute -> computeFunc -> f2
```

### Why are the changes needed?
Bug fix

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Unit tests in `test_utils.R`

Closes apache#26429 from falaki/SPARK-29777.

Authored-by: Hossein <hossein@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…Factory.enabled' into StaticSQLConf.scala

### What changes were proposed in this pull request?

This PR is a followup of apache#26530 and proposes to move the configuration `spark.sql.defaultUrlStreamHandlerFactory.enabled` to `StaticSQLConf.scala` for consistency.

### Why are the changes needed?

To put the similar configurations together and for readability.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manually tested as described in apache#26530.

Closes apache#26570 from HyukjinKwon/SPARK-25694.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?

This is a followup of apache#26418. This PR removed `CalendarInterval`'s `toString` with an unfinished changes.

### Why are the changes needed?

1. Ideally we should make each PR isolated and separate targeting one issue without touching unrelated codes.

2. There are some other places where the string formats were exposed to users. For example:

    ```scala
    scala> sql("select interval 1 days as a").selectExpr("to_csv(struct(a))").show()
    ```
    ```
    +--------------------------+
    |to_csv(named_struct(a, a))|
    +--------------------------+
    |      "CalendarInterval...|
    +--------------------------+
    ```

3.  Such fixes:

    ```diff
     private def writeMapData(
        map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = {
      val keyArray = map.keyArray()
    + val keyString = mapType.keyType match {
    +   case CalendarIntervalType =>
    +    (i: Int) => IntervalUtils.toMultiUnitsString(keyArray.getInterval(i))
    +   case _ => (i: Int) => keyArray.get(i, mapType.keyType).toString
    + }
    ```

    can cause performance regression due to type dispatch for each map.

### Does this PR introduce any user-facing change?

Yes, see 2. case above.

### How was this patch tested?

Manually tested.

Closes apache#26572 from HyukjinKwon/SPARK-29783.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
…estSuite

### What changes were proposed in this pull request?
This PR add guides for `ThriftServerQueryTestSuite`.

### Why are the changes needed?
Add guides

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
N/A

Closes apache#26587 from wangyum/SPARK-28527-FOLLOW-UP.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…o SmallInt and TinyInt in JDBCUtils

This reverts commit f7e53865 i.e PR apache#26301 from master

Closes apache#26583 from shivsood/revert_29644_master.

Authored-by: shivsood <shivsood@microsoft.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?

This PR adds `ALTER TABLE a.b.c RENAME TO x.y.x` support for V2 catalogs.

### Why are the changes needed?

The current implementation doesn't support this command V2 catalogs.

### Does this PR introduce any user-facing change?

Yes, now the renaming table works for v2 catalogs:
```
scala> spark.sql("SHOW TABLES IN testcat.ns1.ns2").show
+---------+---------+
|namespace|tableName|
+---------+---------+
|  ns1.ns2|      old|
+---------+---------+

scala> spark.sql("ALTER TABLE testcat.ns1.ns2.old RENAME TO testcat.ns1.ns2.new").show

scala> spark.sql("SHOW TABLES IN testcat.ns1.ns2").show
+---------+---------+
|namespace|tableName|
+---------+---------+
|  ns1.ns2|      new|
+---------+---------+
```
### How was this patch tested?

Added unit tests.

Closes apache#26539 from imback82/rename_table.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@edrevo
Copy link
Contributor Author

edrevo commented Nov 19, 2019

Continued in #26591 Sorry for closing this PR!

@rdblue
Copy link
Contributor

rdblue commented Nov 21, 2019

@edrevo, thanks for pushing this to completion!

cloud-fan pushed a commit that referenced this pull request Jan 6, 2020
### What changes were proposed in this pull request?
Adding a `LogicalWriteInfo` interface as suggested by cloud-fan in #25990 (comment)

### Why are the changes needed?
It provides compile-time guarantees where we previously had none, which will make it harder to introduce bugs in the future.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Compiles and passes tests

Closes #26678 from edrevo/add-logical-write-info.

Lead-authored-by: Ximo Guanter <joaquin.guantergonzalbez@telefonica.com>
Co-authored-by: Ximo Guanter
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.