Skip to content

Commit

Permalink
Move DataFrameRepartitionSuite to DataFrameSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
ostronaut committed Jan 6, 2025
1 parent d7efa2b commit 75e323b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 86 deletions.

This file was deleted.

62 changes: 27 additions & 35 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -371,41 +371,6 @@ class DataFrameSuite extends QueryTest
}
}

test("SPARK-50525 - cannot partition by map columns") {
val df = sql("select map(id, id) as m, id % 5 as id from range(0, 100, 1, 5)")
// map column
checkError(
exception = intercept[AnalysisException](df.repartition(5, col("m"))),
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_MAP",
parameters = Map(
"expr" -> "\"m\"",
"dataType" -> "\"MAP<BIGINT, BIGINT>\"")
)
// map producing expression
checkError(
exception = intercept[AnalysisException](df.repartition(5, map(col("id"), col("id")))),
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_MAP",
parameters = Map(
"expr" -> "\"map(id, id)\"",
"dataType" -> "\"MAP<BIGINT, BIGINT>\"")
)
// SQL
withTempView("tv") {
df.createOrReplaceTempView("tv")
checkError(
exception = intercept[AnalysisException](sql("SELECT * FROM tv DISTRIBUTE BY m")),
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_MAP",
parameters = Map(
"expr" -> "\"m\"",
"dataType" -> "\"MAP<BIGINT, BIGINT>\""),
context = ExpectedContext(
fragment = "DISTRIBUTE BY m",
start = 17,
stop = 31)
)
}
}

test("repartition with SortOrder") {
// passing SortOrder expressions to .repartition() should result in an informative error

Expand Down Expand Up @@ -463,6 +428,33 @@ class DataFrameSuite extends QueryTest
}
}

test("repartition by MapType") {
Seq("int", "long", "float", "double", "decimal(10, 2)", "string", "varchar(6)").foreach { dt =>
val df = spark.range(20)
.withColumn("c1",
when(col("id") % 3 === 1, typedLit(Map(1 -> 1)))
.when(col("id") % 3 === 2, typedLit(Map(1 -> 1, 2 -> 2)))
.otherwise(typedLit(Map(2 -> 2, 1 -> 1))).cast(s"map<$dt, $dt>"))
.withColumn("c2", typedLit(Map(1 -> null)).cast(s"map<$dt, $dt>"))
.withColumn("c3", lit(null).cast(s"map<$dt, $dt>"))

assertPartitionNumber(df.repartition(4, col("c1")), 2)
assertPartitionNumber(df.repartition(4, col("c2")), 1)
assertPartitionNumber(df.repartition(4, col("c3")), 1)
assertPartitionNumber(df.repartition(4, col("c1"), col("c2")), 2)
assertPartitionNumber(df.repartition(4, col("c1"), col("c3")), 2)
assertPartitionNumber(df.repartition(4, col("c1"), col("c2"), col("c3")), 2)
assertPartitionNumber(df.repartition(4, col("c2"), col("c3")), 2)
}
}

private def assertPartitionNumber(df: => DataFrame, max: Int): Unit = {
val dfGrouped = df.groupBy(spark_partition_id()).count()
// Result number of partition can be lower or equal to max,
// but no more than that.
assert(dfGrouped.count() <= max, dfGrouped.queryExecution.simpleString)
}

test("coalesce") {
intercept[IllegalArgumentException] {
testData.select("key").coalesce(0)
Expand Down

0 comments on commit 75e323b

Please sign in to comment.