Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50503][SQL] Prohibit partitioning by Variant data #49080

5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5303,6 +5303,11 @@
"Parameter markers are not allowed in <statement>."
]
},
"PARTITION_BY_VARIANT" : {
"message" : [
"Cannot use VARIANT producing expressions to partition a DataFrame, but the type of expression <expr> is <dataType>."
]
},
"PARTITION_WITH_NESTED_COLUMN_IS_UNSUPPORTED" : {
"message" : [
"Invalid partitioning: <cols> is missing or is in a map or array."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
case _ => None
}

protected def variantExprInPartitionExpression(plan: LogicalPlan): Option[Expression] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq, Map and Variant should be excluded in the same scenarios. Can you check if we are covering everything for Variant by checking what do for Map?

Copy link
Contributor Author

@harshmotw-db harshmotw-db Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repartition by map is currently allowed. I don't know if that is intended. My previous PR which blocked set operations on Variant followed the same pattern as Maps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add a check for Map I suppose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be quite risky considering several Spark workloads could currently be repartitioning by map types. It's fine for Variant as Variant is a new feature.
Also, broken "repartitioning" isn't a correctness problem since query results should be agnostic of the partitioning right? I think it would result in suboptimal performance at worst.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harshmotw-db no it will be an actual correctness issue. The problem - like with variant - is that the order of key-value pairs in a map is undefined, e.g. Map(a ->b, b -> c) == Map(b -> c, a -> b). However this is not your problem. Your point about it being a problem for existing workloads is correct, however that would only mean we would have to add a feature flag. cc @cloud-fan @HyukjinKwon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch! I think we can fix it now by adding MapSort expression for the Repartition node in InsertMapSortInGroupingExpressions (and rename the rule)

cc @stevomitric @nebojsa-db

plan match {
case r: RepartitionByExpression =>
r.partitionExpressions.find(e => hasVariantType(e.dataType))
case _ => None
}

private def checkLimitLikeClause(name: String, limitExpr: Expression): Unit = {
limitExpr match {
case e if !e.foldable => limitExpr.failAnalysis(
Expand Down Expand Up @@ -849,6 +856,14 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
"colName" -> toSQLId(variantCol.name),
"dataType" -> toSQLType(variantCol.dataType)))

case o if variantExprInPartitionExpression(o).isDefined =>
val variantExpr = variantExprInPartitionExpression(o).get
o.failAnalysis(
errorClass = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT",
messageParameters = Map(
"expr" -> variantExpr.sql,
"dataType" -> toSQLType(variantExpr.dataType)))

case o if o.expressions.exists(!_.deterministic) &&
!operatorAllowsNonDeterministicExpressions(o) &&
!o.isInstanceOf[Project] &&
Expand Down
63 changes: 63 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,69 @@ class DataFrameSuite extends QueryTest
testData.select("key").collect().toSeq)
}

test("SPARK-50503 - cannot partition by variant columns") {
val df = sql("select parse_json(case when id = 0 then 'null' else '1' end)" +
" as v, id % 5 as id, named_struct('v', parse_json(id::string)) s from range(0, 100, 1, 5)")
// variant column
checkError(
exception = intercept[AnalysisException](df.repartition(5, col("v"))),
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT",
parameters = Map(
"expr" -> "v",
"dataType" -> "\"VARIANT\"")
)
// nested variant column
checkError(
exception = intercept[AnalysisException](df.repartition(5, col("s"))),
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT",
parameters = Map(
"expr" -> "s",
"dataType" -> "\"STRUCT<v: VARIANT NOT NULL>\"")
)
// variant producing expression
checkError(
exception =
intercept[AnalysisException](df.repartition(5, parse_json(col("id").cast("string")))),
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT",
parameters = Map(
"expr" -> "parse_json(CAST(id AS STRING))",
"dataType" -> "\"VARIANT\"")
)
// Partitioning by non-variant column works
try {
df.repartition(5, col("id")).collect()
} catch {
case e: Exception =>
fail(s"Expected no exception to be thrown but an exception was thrown: ${e.getMessage}")
}
// SQL
withTempView("tv") {
df.createOrReplaceTempView("tv")
checkError(
exception = intercept[AnalysisException](sql("SELECT * FROM tv DISTRIBUTE BY v")),
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT",
parameters = Map(
"expr" -> "tv.v",
"dataType" -> "\"VARIANT\""),
context = ExpectedContext(
fragment = "DISTRIBUTE BY v",
start = 17,
stop = 31)
)
checkError(
exception = intercept[AnalysisException](sql("SELECT * FROM tv DISTRIBUTE BY s")),
condition = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT",
parameters = Map(
"expr" -> "tv.s",
"dataType" -> "\"STRUCT<v: VARIANT NOT NULL>\""),
context = ExpectedContext(
fragment = "DISTRIBUTE BY s",
start = 17,
stop = 31)
)
}
}

test("repartition with SortOrder") {
// passing SortOrder expressions to .repartition() should result in an informative error

Expand Down
Loading