Skip to content

Commit

Permalink
[Spark] Add order-agnostic type widening modes (#4164)
Browse files Browse the repository at this point in the history
## Description
All code paths implementing type widening so far had a clear 'before'
and 'after' schema and checked that 'before' could be widened to
'after'.
An additional use case is, given two schemas from two separate flows, to
compute wider schema from the two.
To support this, 'bidirectional' widening modes are added to
`SchemaMergingUtils.mergeSchemas()`.
These allow picking the wider of the two input types, or, for decimals,
picking a decimal type that is wider than each input.

## How was this patch tested?
Added unit tests for `SchemaMergingUtils.mergeSchemas()`.
  • Loading branch information
johanl-db authored Feb 20, 2025
1 parent 36d1dae commit ac54ef7
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.catalyst.analysis.DecimalPrecision
import org.apache.spark.sql.types.DecimalType

object DecimalPrecisionTypeCoercionShims {
// Returns the wider decimal type that's wider than both of them
def widerDecimalType(d1: DecimalType, d2: DecimalType): DecimalType =
DecimalPrecision.widerDecimalType(d1, d2)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.catalyst.analysis.DecimalPrecisionTypeCoercion
import org.apache.spark.sql.types.DecimalType

object DecimalPrecisionTypeCoercionShims {
// Returns the wider decimal type that's wider than both of them
def widerDecimalType(d1: DecimalType, d2: DecimalType): DecimalType =
DecimalPrecisionTypeCoercion.widerDecimalType(d1, d2)
}
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ class DeltaAnalysis(session: SparkSession)
if s != t && sNull == tNull =>
addCastsToArrayStructs(tblName, attr, s, t, sNull, typeWideningMode)
case (s: AtomicType, t: AtomicType)
if typeWideningMode.shouldWidenType(fromType = t, toType = s) =>
if typeWideningMode.shouldWidenTo(fromType = t, toType = s) =>
// Keep the type from the query, the target schema will be updated to widen the existing
// type to match it.
attr
Expand Down Expand Up @@ -1097,7 +1097,7 @@ class DeltaAnalysis(session: SparkSession)

case (StructField(name, sourceType: AtomicType, _, _),
i @ TargetIndex(StructField(targetName, targetType: AtomicType, _, targetMetadata)))
if typeWideningMode.shouldWidenType(fromType = targetType, toType = sourceType) =>
if typeWideningMode.shouldWidenTo(fromType = targetType, toType = sourceType) =>
Alias(
GetStructField(parent, i, Option(name)),
targetName)(explicitMetadata = Option(targetMetadata))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,100 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.types.AtomicType
import org.apache.spark.sql.delta.DecimalPrecisionTypeCoercionShims
import org.apache.spark.sql.util.ScalaExtensions._

import org.apache.spark.sql.types.{AtomicType, DecimalType}

/**
* A type widening mode captures a specific set of type changes that are allowed to be applied.
* Currently:
* - NoTypeWidening: No type change is allowed.
* - AllTypeWidening: All supported type widening changes are allowed.
* - TypeEvolution(uniformIcebergCompatibleOnly = true): Type changes that are eligible to be
* applied automatically during schema evolution and that are supported by Iceberg are allowed.
* - TypeEvolution(uniformIcebergCompatibleOnly = false): Type changes that are eligible to be
* applied automatically during schema evolution are allowed, even if they are not supported by
* Iceberg.
* - AllTypeWidening: Allows widening to the target type using any supported type change.
* - TypeEvolution: Only allows widening to the target type if the type change is eligible to be
* applied automatically during schema evolution.
* - AllTypeWideningToCommonWiderType: Allows widening to a common (possibly different) wider type
* using any supported type change.
* - TypeEvolutionToCommonWiderType: Allows widening to a common (possibly different) wider type
* using only type changes that are eligible to be applied automatically during schema
* evolution.
*
* TypeEvolution modes can be restricted to only type changes supported by Iceberg by passing
* `uniformIcebergCompatibleOnly = truet`, to ensure that we don't automatically apply a type change
* that would break Iceberg compatibility.
*/
sealed trait TypeWideningMode {
def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean
def getWidenedType(fromType: AtomicType, toType: AtomicType): Option[AtomicType]

def shouldWidenTo(fromType: AtomicType, toType: AtomicType): Boolean =
getWidenedType(fromType, toType).contains(toType)
}

object TypeWideningMode {
/**
* No type change allowed. Typically because type widening and/or schema evolution isn't enabled.
*/
case object NoTypeWidening extends TypeWideningMode {
override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean = false
override def getWidenedType(fromType: AtomicType, toType: AtomicType): Option[AtomicType] = None
}

/** All supported type widening changes are allowed. */
case object AllTypeWidening extends TypeWideningMode {
override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean =
TypeWidening.isTypeChangeSupported(fromType = fromType, toType = toType)
override def getWidenedType(fromType: AtomicType, toType: AtomicType): Option[AtomicType] =
Option.when(TypeWidening.isTypeChangeSupported(fromType = fromType, toType = toType))(toType)
}

/**
* Type changes that are eligible to be applied automatically during schema evolution are allowed.
* Can be restricted to only type changes supported by Iceberg.
*/
case class TypeEvolution(uniformIcebergCompatibleOnly: Boolean) extends TypeWideningMode {
override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean =
TypeWidening.isTypeChangeSupportedForSchemaEvolution(
fromType = fromType, toType = toType, uniformIcebergCompatibleOnly)
override def getWidenedType(fromType: AtomicType, toType: AtomicType): Option[AtomicType] =
Option.when(TypeWidening.isTypeChangeSupportedForSchemaEvolution(
fromType = fromType, toType = toType, uniformIcebergCompatibleOnly))(toType)
}

/**
* All supported type widening changes are allowed. Unlike [[AllTypeWidening]], this also allows
* widening `to` to `from`, and for decimals, widening to a different decimal type that is wider
* than both input types. Use for example when merging two unrelated schemas and we want just want
* to find a wider schema to use.
*/
case object AllTypeWideningToCommonWiderType extends TypeWideningMode {
override def getWidenedType(left: AtomicType, right: AtomicType): Option[AtomicType] =
(left, right) match {
case (l, r) if TypeWidening.isTypeChangeSupported(l, r) => Some(r)
case (l, r) if TypeWidening.isTypeChangeSupported(r, l) => Some(l)
case (l: DecimalType, r: DecimalType) =>
val wider = DecimalPrecisionTypeCoercionShims.widerDecimalType(l, r)
Option.when(
TypeWidening.isTypeChangeSupported(l, wider) &&
TypeWidening.isTypeChangeSupported(r, wider))(wider)
case _ => None
}
}

/**
* Type changes that are eligible to be applied automatically during schema evolution are allowed.
* Can be restricted to only type changes supported by Iceberg. Unlike [[TypeEvolution]], this
* also allows widening `to` to `from`, and for decimals, widening to a different decimal type
* that is wider han both input types. Use for example when merging two unrelated schemas and we
* want just want to find a wider schema to use.
*/
case class TypeEvolutionToCommonWiderType(uniformIcebergCompatibleOnly: Boolean)
extends TypeWideningMode {
override def getWidenedType(left: AtomicType, right: AtomicType): Option[AtomicType] = {
def typeChangeSupported: (AtomicType, AtomicType) => Boolean =
TypeWidening.isTypeChangeSupportedForSchemaEvolution(_, _, uniformIcebergCompatibleOnly)

(left, right) match {
case (l, r) if typeChangeSupported(l, r) => Some(r)
case (l, r) if typeChangeSupported(r, l) => Some(l)
case (l: DecimalType, r: DecimalType) =>
val wider = DecimalPrecisionTypeCoercionShims.widerDecimalType(l, r)
Option.when(typeChangeSupported(l, wider) && typeChangeSupported(r, wider))(wider)
case _ => None
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ object SchemaMergingUtils {
// If type widening is enabled and the type can be widened, it takes precedence over
// keepExistingType.
case (current: AtomicType, update: AtomicType)
if typeWideningMode.shouldWidenType(fromType = current, toType = update) => update
if typeWideningMode.getWidenedType(fromType = current, toType = update).isDefined =>
typeWideningMode.getWidenedType(fromType = current, toType = update).get

// Simply keeps the existing type for primitive types
case (current, _) if keepExistingType => current
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ def normalizeColumnNamesInDataType(
isDatatypeReadCompatible(e.keyType, n.keyType) &&
isDatatypeReadCompatible(e.valueType, n.valueType)
case (e: AtomicType, n: AtomicType)
if typeWideningMode.shouldWidenType(fromType = e, toType = n) => true
if typeWideningMode.shouldWidenTo(fromType = e, toType = n) => true
case (a, b) => a == b
}
}
Expand Down
Loading

0 comments on commit ac54ef7

Please sign in to comment.