Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51119][SQL][FOLLOW-UP] Add fallback to ResolveDefaultColumnsUtil existenceDefaultValues #49962

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package org.apache.spark.sql.catalyst.util

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.{Literal => ExprLiteral}
import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, Optimizer}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -340,12 +341,44 @@ object ResolveDefaultColumns extends QueryErrorsBase
throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
"", field.name, defaultSQL)
}
if (!expr.resolved) {
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
"", field.name, defaultSQL, null)

val resolvedExpr = expr match {
case _: ExprLiteral => expr
case c: Cast if c.resolved => expr
case _ =>
fallbackResolveExistenceDefaultValue(field)
}

coerceDefaultValue(expr, field.dataType, "", field.name, defaultSQL)
coerceDefaultValue(resolvedExpr, field.dataType, "", field.name, defaultSQL)
}

// In most cases, column existsDefault should already be persisted as resolved
// and constant-folded literal sql, but because they are fetched from external catalog,
// it is possible that this assumption does not hold, so we fallback to full analysis
// if we encounter an unresolved existsDefault
private def fallbackResolveExistenceDefaultValue(
field: StructField): Expression = {
field.getExistenceDefaultValue().map { defaultSQL: String =>

logWarning(log"Encountered unresolved exists default value: " +
log"'${MDC(COLUMN_DEFAULT_VALUE, defaultSQL)}' " +
log"for column ${MDC(COLUMN_NAME, field.name)} " +
log"with ${MDC(COLUMN_DATA_TYPE_SOURCE, field.dataType)}, " +
log"falling back to full analysis.")

val expr = analyze(field, "", EXISTS_DEFAULT_COLUMN_METADATA_KEY)
val literal = expr match {
case _: ExprLiteral | _: Cast => expr
case _ => throw SparkException.internalError(s"parse existence default as literal err," +
s" field name: ${field.name}, value: $defaultSQL")
}
// sanity check
if (!literal.resolved) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think def analyze already did this check?

Copy link
Contributor Author

@szehon-ho szehon-ho Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the original code for resolving existenceDefaultValues, which called analyze(), went through this check. It's even indicated as extra in the comments.

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala#L310

So I was just keeping the original behavior, do you suggest to remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, lets me know if its better

throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
"", field.name, defaultSQL, null)
}
literal
}.orNull
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,4 +831,22 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
validateConvertedDefaults("c4", VariantType, "parse_json(null)", "CAST(NULL AS VARIANT)")

}

test("SPARK-51119: Add fallback to process unresolved EXISTS_DEFAULT") {
val source = StructType(
Array(
StructField("c1", VariantType, true,
new MetadataBuilder()
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)")
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)")
.build()),
StructField("c0", StringType, true,
new MetadataBuilder()
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()")
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()")
.build())))
val res = ResolveDefaultColumns.existenceDefaultValues(source)
assert(res(0) == null)
assert(res(1) == UTF8String.fromString("spark_catalog"))
}
}