Skip to content

Commit

Permalink
[SPARK-51119][SQL][FOLLOW-UP] Add fallback to ResolveDefaultColumnsUt…
Browse files Browse the repository at this point in the history
…il existenceDefaultValues

### What changes were proposed in this pull request?
 The original change in apache#49840 was too optimistic and assumes that all column EXISTS_DEFAULT are already resolved and column folded.  However, if there is bad EXISTS_DEFAULT metadata (an unresolved expression is persisted) it will break.  Add fallback to use the original logic in that case.

### Why are the changes needed?
There are some cases where bad EXISTS_DEFAULT metadata is persisted by external catalogs, due to some bugs such as apache#49942 or other problems.

### Does this PR introduce _any_ user-facing change?
No, it should handle bad metadata better.

### How was this patch tested?
Add unit test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#49962 from szehon-ho/SPARK-51119-follow-2.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
szehon-ho authored and cloud-fan committed Feb 21, 2025
1 parent f37be89 commit 4ffc398
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package org.apache.spark.sql.catalyst.util

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.{Literal => ExprLiteral}
import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, Optimizer}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -340,12 +341,39 @@ object ResolveDefaultColumns extends QueryErrorsBase
throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
"", field.name, defaultSQL)
}
if (!expr.resolved) {
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
"", field.name, defaultSQL, null)

val resolvedExpr = expr match {
case _: ExprLiteral => expr
case c: Cast if c.resolved => expr
case _ =>
fallbackResolveExistenceDefaultValue(field)
}

coerceDefaultValue(expr, field.dataType, "", field.name, defaultSQL)
coerceDefaultValue(resolvedExpr, field.dataType, "", field.name, defaultSQL)
}

// In most cases, column existsDefault should already be persisted as resolved
// and constant-folded literal sql, but because they are fetched from external catalog,
// it is possible that this assumption does not hold, so we fallback to full analysis
// if we encounter an unresolved existsDefault
private def fallbackResolveExistenceDefaultValue(
field: StructField): Expression = {
field.getExistenceDefaultValue().map { defaultSQL: String =>

logWarning(log"Encountered unresolved exists default value: " +
log"'${MDC(COLUMN_DEFAULT_VALUE, defaultSQL)}' " +
log"for column ${MDC(COLUMN_NAME, field.name)} " +
log"with ${MDC(COLUMN_DATA_TYPE_SOURCE, field.dataType)}, " +
log"falling back to full analysis.")

val expr = analyze(field, "", EXISTS_DEFAULT_COLUMN_METADATA_KEY)
val literal = expr match {
case _: ExprLiteral | _: Cast => expr
case _ => throw SparkException.internalError(s"parse existence default as literal err," +
s" field name: ${field.name}, value: $defaultSQL")
}
literal
}.orNull
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,4 +831,22 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
validateConvertedDefaults("c4", VariantType, "parse_json(null)", "CAST(NULL AS VARIANT)")
validateConvertedDefaults("c5", IntegerType, "1 + 1", "2")
}

test("SPARK-51119: Add fallback to process unresolved EXISTS_DEFAULT") {
val source = StructType(
Array(
StructField("c1", VariantType, true,
new MetadataBuilder()
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)")
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)")
.build()),
StructField("c0", StringType, true,
new MetadataBuilder()
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()")
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()")
.build())))
val res = ResolveDefaultColumns.existenceDefaultValues(source)
assert(res(0) == null)
assert(res(1) == UTF8String.fromString("spark_catalog"))
}
}

0 comments on commit 4ffc398

Please sign in to comment.