diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 2feb59317d8a8..adeb1d5413116 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -1478,13 +1478,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val table1 = "tab1e1" val table2 = "table2" - Seq((2, 4), (4, 2), (2, 6), (6, 2)).foreach { + Seq((2, 4), (4, 2), (2, 6), (6, 2), (3, 1), (5, 15)).foreach { case (table1buckets, table2buckets) => catalog.clearTables() - val partition1 = Array(identity("data"), + val partition1 = Array(identity("store_id"), bucket(table1buckets, "dept_id")) - val partition2 = Array(bucket(3, "store_id"), + val partition2 = Array(identity("store_id"), bucket(table2buckets, "dept_id")) createTable(table1, schema2, partition1) @@ -1555,6 +1555,61 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } + test("SPARK-47094: Compatible buckets does not support SPJ when bucket numbers not divisible") { + val table1 = "tab1e1" + val table2 = "table2" + + Seq((2, 5), (5, 3)).foreach { + case (table1buckets, table2buckets) => + catalog.clearTables() + + val partition1 = Array(identity("store_id"), + bucket(table1buckets, "dept_id")) + val partition2 = Array(identity("store_id"), + bucket(table2buckets, "dept_id")) + + createTable(table1, schema2, partition1) + sql(s"INSERT INTO testcat.ns.$table1 VALUES " + + "(0, 0, 'aa'), " + + "(1, 0, 'ab'), " + + "(2, 1, 'ac') " + ) + + createTable(table2, schema2, partition2) + sql(s"INSERT INTO testcat.ns.$table2 VALUES " + + "(6, 0, '01'), " + + "(5, 1, '02'), " + + "(5, 1, '03') " + ) + + withSQLConf( + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", + SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", + SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false", + SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { + val df = sql( + s""" + |${selectWithMergeJoinHint("t1", "t2")} + |t1.store_id, t2.store_id, t1.dept_id, t2.dept_id, t1.data, t2.data + |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2 + |ON t1.dept_id = t2.dept_id + |ORDER BY t1.store_id, t1.dept_id, t1.data, t2.data + |""".stripMargin) + + val shuffles = collectShuffles(df.queryExecution.executedPlan) + assert(shuffles.nonEmpty, "SPJ should not trigger") + + checkAnswer(df, Seq( + Row(0, 6, 0, 0, "aa", "01"), + Row(1, 6, 0, 0, "ab", "01"), + Row(2, 5, 1, 1, "ac", "02"), + Row(2, 5, 1, 1, "ac", "03") + )) + } + } + } + test("SPARK-47094: Compatible buckets does not support SPJ with " + "push-down values or partially-clustered") { val table1 = "tab1e1"