Skip to content

Commit

Permalink
test case for non-divisible bucket number & odd divisible number
Browse files Browse the repository at this point in the history
  • Loading branch information
hpal committed Feb 28, 2024
1 parent 5067852 commit 97eed0a
Showing 1 changed file with 58 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1478,13 +1478,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
val table1 = "tab1e1"
val table2 = "table2"

Seq((2, 4), (4, 2), (2, 6), (6, 2)).foreach {
Seq((2, 4), (4, 2), (2, 6), (6, 2), (3, 1), (5, 15)).foreach {
case (table1buckets, table2buckets) =>
catalog.clearTables()

val partition1 = Array(identity("data"),
val partition1 = Array(identity("store_id"),
bucket(table1buckets, "dept_id"))
val partition2 = Array(bucket(3, "store_id"),
val partition2 = Array(identity("store_id"),
bucket(table2buckets, "dept_id"))

createTable(table1, schema2, partition1)
Expand Down Expand Up @@ -1555,6 +1555,61 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
}
}

test("SPARK-47094: Compatible buckets does not support SPJ when bucket numbers not divisible") {
val table1 = "tab1e1"
val table2 = "table2"

Seq((2, 5), (5, 3)).foreach {
case (table1buckets, table2buckets) =>
catalog.clearTables()

val partition1 = Array(identity("store_id"),
bucket(table1buckets, "dept_id"))
val partition2 = Array(identity("store_id"),
bucket(table2buckets, "dept_id"))

createTable(table1, schema2, partition1)
sql(s"INSERT INTO testcat.ns.$table1 VALUES " +
"(0, 0, 'aa'), " +
"(1, 0, 'ab'), " +
"(2, 1, 'ac') "
)

createTable(table2, schema2, partition2)
sql(s"INSERT INTO testcat.ns.$table2 VALUES " +
"(6, 0, '01'), " +
"(5, 1, '02'), " +
"(5, 1, '03') "
)

withSQLConf(
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false",
SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true",
SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
val df = sql(
s"""
|${selectWithMergeJoinHint("t1", "t2")}
|t1.store_id, t2.store_id, t1.dept_id, t2.dept_id, t1.data, t2.data
|FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
|ON t1.dept_id = t2.dept_id
|ORDER BY t1.store_id, t1.dept_id, t1.data, t2.data
|""".stripMargin)

val shuffles = collectShuffles(df.queryExecution.executedPlan)
assert(shuffles.nonEmpty, "SPJ should not trigger")

checkAnswer(df, Seq(
Row(0, 6, 0, 0, "aa", "01"),
Row(1, 6, 0, 0, "ab", "01"),
Row(2, 5, 1, 1, "ac", "02"),
Row(2, 5, 1, 1, "ac", "03")
))
}
}
}

test("SPARK-47094: Compatible buckets does not support SPJ with " +
"push-down values or partially-clustered") {
val table1 = "tab1e1"
Expand Down

0 comments on commit 97eed0a

Please sign in to comment.