Skip to content

Commit

Permalink
[SPARK-50525] Define analysis check for RepartitionByExpression for M…
Browse files Browse the repository at this point in the history
…apType
  • Loading branch information
ostronaut committed Dec 11, 2024
1 parent faef3fa commit 37ef34f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5314,6 +5314,11 @@
"Parameter markers are not allowed in <statement>."
]
},
"PARTITION_BY_MAP" : {
"message" : [
"Cannot use MAP producing expressions to partition a DataFrame, but the type of expression <expr> is <dataType>."
]
},
"PARTITION_BY_VARIANT" : {
"message" : [
"Cannot use VARIANT producing expressions to partition a DataFrame, but the type of expression <expr> is <dataType>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
case _ => None
}

protected def mapExprInPartitionExpression(plan: LogicalPlan): Option[Expression] =
plan match {
case r: RepartitionByExpression =>
r.partitionExpressions.find(e => hasMapType(e.dataType))
case _ => None
}

private def checkLimitLikeClause(name: String, limitExpr: Expression): Unit = {
limitExpr match {
case e if !e.foldable => limitExpr.failAnalysis(
Expand Down Expand Up @@ -859,7 +866,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
summary = j.origin.context.summary)

// TODO: although map type is not orderable, technically map type should be able to be
// used in equality comparison, remove this type check once we support it.
// used in equality comparison, remove this type check once we support it.
case o if mapColumnInSetOperation(o).isDefined =>
val mapCol = mapColumnInSetOperation(o).get
o.failAnalysis(
Expand All @@ -885,6 +892,14 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
"expr" -> variantExpr.sql,
"dataType" -> toSQLType(variantExpr.dataType)))

case o if mapExprInPartitionExpression(o).isDefined =>
val mapExpr = mapExprInPartitionExpression(o).get
o.failAnalysis(
errorClass = "UNSUPPORTED_FEATURE.PARTITION_BY_MAP",
messageParameters = Map(
"expr" -> mapExpr.sql,
"dataType" -> toSQLType(mapExpr.dataType)))

case o if o.expressions.exists(!_.deterministic) &&
!operatorAllowsNonDeterministicExpressions(o) &&
!o.isInstanceOf[Project] &&
Expand Down
42 changes: 42 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,48 @@ class DataFrameSuite extends QueryTest
}
}

test("SPARK-50525 - cannot partition by map columns") {
val df = sql("select map(id, id) as m, id % 5 as id from range(0, 100, 1, 5)")
// map column
checkError(
exception = intercept[AnalysisException](df.repartition(5, col("m"))),
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_MAP",
parameters = Map(
"expr" -> "m",
"dataType" -> "\"MAP<BIGINT, BIGINT>\"")
)
// map producing expression
checkError(
exception = intercept[AnalysisException](df.repartition(5, map(col("id"), col("id")))),
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_MAP",
parameters = Map(
"expr" -> "map(id, id)",
"dataType" -> "\"MAP<BIGINT, BIGINT>\"")
)
// Partitioning by non-map column works
try {
df.repartition(5, col("id")).collect()
} catch {
case e: Exception =>
fail(s"Expected no exception to be thrown but an exception was thrown: ${e.getMessage}")
}
// SQL
withTempView("tv") {
df.createOrReplaceTempView("tv")
checkError(
exception = intercept[AnalysisException](sql("SELECT * FROM tv DISTRIBUTE BY m")),
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_MAP",
parameters = Map(
"expr" -> "tv.m",
"dataType" -> "\"MAP<BIGINT, BIGINT>\""),
context = ExpectedContext(
fragment = "DISTRIBUTE BY m",
start = 17,
stop = 31)
)
}
}

test("repartition with SortOrder") {
// passing SortOrder expressions to .repartition() should result in an informative error

Expand Down

0 comments on commit 37ef34f

Please sign in to comment.