Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50525][SQL] Define InsertMapSortInRepartitionExpressions Optimizer Rule #49144

Closed
wants to merge 6 commits into from

Conversation

ostronaut
Copy link
Contributor

@ostronaut ostronaut commented Dec 11, 2024

What changes were proposed in this pull request?

In the current version of Spark, its possible to use MapType as column for repartitioning. But MapData does not implement equals and hashCode (in according to SPARK-9415 and [SPARK-16135][SQL] Remove hashCode and equals in ArrayBasedMapData). Considering that, hash value for same Maps can be different.

In an attempt to run xxhash64 or hash function on MapType, org.apache.spark.sql.catalyst.ExtendedAnalysisException: [DATATYPE_MISMATCH.HASH_MAP_TYPE] Cannot resolve "xxhash64(value)" due to data type mismatch: Input to the function `xxhash64` cannot contain elements of the "MAP" type. In Spark, same maps may have different hashcode, thus hash expressions are prohibited on "MAP" elements. To restore previous behavior set "spark.sql.legacy.allowHashOnMapType" to "true".; will be thrown.

Also, when trying to run ds.distinct(col("value")), where value has MapType, the following exception is thrown: org.apache.spark.sql.catalyst.ExtendedAnalysisException: [UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE] The feature is not supported: Cannot have MAP type columns in DataFrame which calls set operations (INTERSECT, EXCEPT, etc.), but the type of column `value` is "MAP<INT, STRING>".;

With the above consideration, a new InsertMapSortInRepartitionExpressions Rule[LogicalPlan] was implemented to insert mapsort for every MapType in RepartitionByExpression.partitionExpressions.

Why are the changes needed?

To keep repartition API for MapType consistent.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit tests.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Dec 11, 2024
@ostronaut
Copy link
Contributor Author

Hi @harshmotw-db, @hvanhovell, @cloud-fan in continuation to #49080 discussion and SPARK-50525.

@ostronaut ostronaut changed the title [SPARK-50525] Prohibit partitioning by MapType [SPARK-50525][SQL] Prohibit partitioning by MapType Dec 11, 2024
@ostronaut ostronaut force-pushed the features/map_repartition branch from 5c01b4e to 37ef34f Compare December 11, 2024 12:19
@ostronaut ostronaut requested a review from MaxGekk December 13, 2024 15:53
@cloud-fan
Copy link
Contributor

It's a breaking change, shall we just support it now? We can handle it in InsertMapSortInGroupingExpressions?

@ostronaut
Copy link
Contributor Author

@cloud-fan this change will impact users indeed, but it was done in considerations to #49080 and #48909. If you think it would be better to extend repartition in the similar fashion as InsertMapSortInGroupingExpressions, we can discuss that as well. We will have a bit for inconsistency in this case.

@cloud-fan
Copy link
Contributor

consistency is good but breaking change is scary, even if the old behavior returns the wrong result.

@ostronaut
Copy link
Contributor Author

Okay, thank you for your point @cloud-fan! Just to double check if i've got everything correctly before further implementation: instead of prohibiting map expressions for partitioning, we can implemented a Rule[LogicalPlan] named InsertMapSortInPartitioningExpressions (any other name can be recommended) where we will replace MapType to MapSort. Having Map Sorted will then produce the same hash codes for the same maps, as per InterpretedHashFunction.hash logic (where order of elements matter for the final cash value):

case map: MapData =>
  val (kt, vt) = dataType match {
    case udt: UserDefinedType[_] =>
      val mapType = udt.sqlType.asInstanceOf[MapType]
      mapType.keyType -> mapType.valueType
    case MapType(kt, vt, _) => kt -> vt
  }
  val keys = map.keyArray()
  val values = map.valueArray()
  var result = seed
  var i = 0
  while (i < map.numElements()) {
    result = hash(keys.get(i, kt), kt, result)
    result = hash(values.get(i, vt), vt, result)
    i += 1
  }
  result

Please let me know if im missing something or if you have any other recommendations!

@ostronaut
Copy link
Contributor Author

@MaxGekk could you please check this PR again? what are your thoughts?

@cloud-fan
Copy link
Contributor

We can rename InsertMapSortInGroupingExpressions to InsertMapSortExpression and match the Repartition plan in this rule to insert MapSort expression

@hvanhovell
Copy link
Contributor

@ostronaut what does it take to get this PR moving?

@ostronaut
Copy link
Contributor Author

@ostronaut what does it take to get this PR moving?

No blockers for now! If this is fine, i will implement changes as suggested by @cloud-fan, where map will be changed to sorted map in same way as InsertMapSortInGroupingExpressions.

If you have any other comments, please let me know!

@ostronaut
Copy link
Contributor Author

@cloud-fan, @hvanhovell, @MaxGekk ready for review after applying all suggestions!

* SELECT * FROM TABLE DISTRIBUTE BY map_column =>
* SELECT * FROM TABLE DISTRIBUTE BY map_sort(map_column)
*/
object InsertMapSortInRepartitionExpressions extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we combine these two rules so that we only need to traverse the plan once?

Copy link
Contributor Author

@ostronaut ostronaut Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initially i wanted to do the same, but logic for InsertMapSortInGroupingExpressions and InsertMapSortInRepartitionExpressions is quite different: Grouping produces a new output after applying the changes, while Repartition only updates existing RepartitionByExpression by replacing partitionExpressions.
Also, there is a dependency between InsertMapSortInGroupingExpressions and PullOutGroupingExpressions, as mentioned in this comment.

For those reasons i decided to split them into separate Rules. But if you think performance saving from reduced traverse will be significant, we can combine those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to use transformUpWithNewOutput for both. If we hit RepartitionByExpression, we return Nil as the new output.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this will make things more complex, while there is no need to return new output for RepartitionByExpression. Also, to prevent traverse on every plan we have added two conditions in InsertMapSortInRepartitionExpressions:

  1. _.containsPattern(REPARTITION_OPERATION) as cond to transformUpWithPruning.
  2. if rep.partitionExpressions.exists(mapTypeExistsRecursively) in case matching.

So i would keep these two independent from each other.

@ostronaut ostronaut force-pushed the features/map_repartition branch from 0d7adec to 75e323b Compare January 6, 2025 18:13
@ostronaut ostronaut changed the title [SPARK-50525][SQL] Prohibit partitioning by MapType [SPARK-50525][SQL] Define InsertMapSortInRepartitionExpressions Optimizer Rule Jan 6, 2025
@ostronaut ostronaut force-pushed the features/map_repartition branch from d6592d2 to 20c76e4 Compare January 7, 2025 11:53
@cloud-fan
Copy link
Contributor

There is a test failure in AdaptiveQueryExecSuite, may be related?

@ostronaut
Copy link
Contributor Author

There is a test failure in AdaptiveQueryExecSuite, may be related?

- SPARK-47148: AQE should avoid to submit shuffle job on cancellation test failed on initial run, but after re-run it succeeded. So i dont think its related to this PR. Most likely this test is unstable.

Note: the issue was: scala.package.Seq.apply[org.apache.spark.SparkException](error).++[Throwable](scala.Option.apply[Throwable](error.getCause())).++[Throwable](scala.Predef.wrapRefArray[Throwable](error.getSuppressed())).exists(((e: Throwable) => e.getMessage().!=(null).&&(e.getMessage().contains("coalesce test error")))) was false (AdaptiveQueryExecSuite.scala:940)

@ostronaut
Copy link
Contributor Author

Hi @cloud-fan, @MaxGekk. Can we merge this PR?

@cloud-fan
Copy link
Contributor

The Spark Connect failure is unrelated, thanks, merging to master!

@cloud-fan cloud-fan closed this in a4f2870 Jan 10, 2025
@ostronaut ostronaut deleted the features/map_repartition branch January 10, 2025 08:06
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @ostronaut and @cloud-fan . This seems to break non-ANSI mode. Could you take a look at the CI failures?

@dongjoon-hyun
Copy link
Member

According to the logs, 4 suites failed due to this.

org.apache.spark.sql.DataFrameSuite
org.apache.spark.sql.DSV2CharVarcharTestSuite
org.apache.spark.sql.FileSourceCharVarcharTestSuite
org.apache.spark.sql.HiveCharVarcharTestSuite

@@ -428,6 +428,33 @@ class DataFrameSuite extends QueryTest
}
}

test("repartition by MapType") {
Seq("int", "long", "float", "double", "decimal(10, 2)", "string", "varchar(6)").foreach { dt =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This newly added test case fails at NON-ANSI mode at the test case, "decimal(10, 2)".

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jan 12, 2025

In addition to the newly added test case, the other three failures (including Hive module) seems to be originated by other PR, not this optimizer PR.

Screenshot 2025-01-12 at 12 15 33

@dongjoon-hyun
Copy link
Member

Here is a follow-up.

dongjoon-hyun added a commit that referenced this pull request Jan 13, 2025
…y MapType` test assumption

### What changes were proposed in this pull request?

This is a follow-up to recover the NON-ANSI mode CI failure by adding a test assumption clearly.
- #49144

### Why are the changes needed?

**BEFORE**
```
$ SPARK_ANSI_SQL_MODE=false build/sbt "sql/testOnly *.DataFrameSuite -- -z MapType"
[info] *** 1 TEST FAILED ***
[error] Failed tests:
[error] 	org.apache.spark.sql.DataFrameSuite
```

**AFTER**
```
$ SPARK_ANSI_SQL_MODE=false build/sbt "sql/testOnly *.DataFrameSuite -- -z MapType"
[info] All tests passed.
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually test with `SPARK_ANSI_SQL_MODE=false`.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49457 from dongjoon-hyun/SPARK-50525.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
cloud-fan pushed a commit that referenced this pull request Feb 20, 2025
…icits`

### What changes were proposed in this pull request?

Related to #49144. scala 2.12 is failing with `ArrayImplicits`, which is in use for `ShowTablesExec.isTempView` method. This PR removes `org.apache.spark.util.ArrayImplicits._` from `ShowTablesExec` and uses default Seq instead.

### Why are the changes needed?

To fix failing scala 2.12 compilation isssu.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing init tests and actions run.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #50008 from ostronaut/features/ShowTablesExec-remove-ArrayImplicits.

Authored-by: Dima <dimanowq@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Feb 20, 2025
…icits`

Related to #49144. scala 2.12 is failing with `ArrayImplicits`, which is in use for `ShowTablesExec.isTempView` method. This PR removes `org.apache.spark.util.ArrayImplicits._` from `ShowTablesExec` and uses default Seq instead.

To fix failing scala 2.12 compilation isssu.

No

Existing init tests and actions run.

No.

Closes #50008 from ostronaut/features/ShowTablesExec-remove-ArrayImplicits.

Authored-by: Dima <dimanowq@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 4d15f64)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Feb 20, 2025
…icits`

Related to #49144. scala 2.12 is failing with `ArrayImplicits`, which is in use for `ShowTablesExec.isTempView` method. This PR removes `org.apache.spark.util.ArrayImplicits._` from `ShowTablesExec` and uses default Seq instead.

To fix failing scala 2.12 compilation isssu.

No

Existing init tests and actions run.

No.

Closes #50008 from ostronaut/features/ShowTablesExec-remove-ArrayImplicits.

Authored-by: Dima <dimanowq@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 4d15f64)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants