Skip to content

Commit

Permalink
[Delta Sharing] Enable D2D delta sharing with type widening (#3675)
Browse files Browse the repository at this point in the history
## Description
Adds type widening to the list of supported features for D2D delta
sharing and adds client-side tests covering reading a table that had a
type change applied using the type widening table feature.

## How was this patch tested?
Added tests.
  • Loading branch information
johanl-db authored Nov 18, 2024
1 parent 3a98b8a commit ed6ae70
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,7 @@ import java.util.{TimeZone, UUID}

import scala.reflect.ClassTag

import org.apache.spark.sql.delta.{
ColumnMappingTableFeature,
DeletionVectorsTableFeature,
DeltaLog,
DeltaParquetFileFormat,
SnapshotDescriptor,
TimestampNTZTableFeature
}
import org.apache.spark.sql.delta.VariantTypeTableFeature
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import com.google.common.hash.Hashing
import io.delta.sharing.client.{DeltaSharingClient, DeltaSharingRestClient}
Expand All @@ -51,13 +43,18 @@ object DeltaSharingUtils extends Logging {
DeletionVectorsTableFeature.name,
ColumnMappingTableFeature.name,
TimestampNTZTableFeature.name,
TypeWideningPreviewTableFeature.name,
TypeWideningTableFeature.name,
VariantTypeTableFeature.name
)

val SUPPORTED_READER_FEATURES: Seq[String] =
Seq(
DeletionVectorsTableFeature.name,
ColumnMappingTableFeature.name,
TimestampNTZTableFeature.name,
TypeWideningPreviewTableFeature.name,
TypeWideningTableFeature.name,
VariantTypeTableFeature.name
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sharing.spark

import org.apache.spark.sql.delta.DeltaConfigs
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, DataFrame, QueryTest}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.delta.sharing.DeltaSharingTestSparkUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._

// Unit tests to verify that type widening works with delta sharing.
class DeltaSharingDataSourceTypeWideningSuite
extends QueryTest
with DeltaSQLCommandTest
with DeltaSharingTestSparkUtils
with DeltaSharingDataSourceDeltaTestUtils {

import testImplicits._

protected override def sparkConf: SparkConf = {
super.sparkConf
.set(DeltaConfigs.ENABLE_TYPE_WIDENING.defaultTablePropertyKey, true.toString)
}

/** Sets up delta sharing mocks to read a table and validates results. */
private def testReadingDeltaShare(
tableName: String,
versionAsOf: Option[Long],
filter: Option[Column] = None,
expectedSchema: StructType,
expectedJsonPredicate: Seq[String] = Seq.empty,
expectedResult: DataFrame): Unit = {
withTempDir { tempDir =>
val sharedTableName = tableName + "shared_delta_table"
prepareMockedClientMetadata(tableName, sharedTableName)
prepareMockedClientGetTableVersion(tableName, sharedTableName, versionAsOf)
prepareMockedClientAndFileSystemResult(tableName, sharedTableName, versionAsOf)

var reader = spark.read
.format("deltaSharing")
.option("responseFormat", DeltaSharingOptions.RESPONSE_FORMAT_DELTA)
versionAsOf.foreach { version =>
reader = reader.option("versionAsOf", version)
}

TestClientForDeltaFormatSharing.jsonPredicateHints.clear()
withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
val profileFile = prepareProfileFile(tempDir)
var result = reader
.load(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName")
filter.foreach { f =>
result = result.filter(f)
}
assert(result.schema === expectedSchema)
checkAnswer(result, expectedResult)
assert(getJsonPredicateHints(tableName) === expectedJsonPredicate)
}
}
}

/** Fetches JSON predicates passed to the test client when reading a table. */
private def getJsonPredicateHints(tableName: String): Seq[String] = {
TestClientForDeltaFormatSharing
.jsonPredicateHints
.filterKeys(_.contains(tableName))
.values
.toSeq
}

/** Creates a table and applies a type change to it. */
private def withTestTable(testBody: String => Unit): Unit = {
val deltaTableName = "type_widening"
withTable(deltaTableName) {
sql(s"CREATE TABLE $deltaTableName (value SMALLINT) USING DELTA")
sql(s"INSERT INTO $deltaTableName VALUES (1), (2)")
sql(s"ALTER TABLE $deltaTableName CHANGE COLUMN value TYPE INT")
sql(s"INSERT INTO $deltaTableName VALUES (3), (${Int.MaxValue})")
sql(s"INSERT INTO $deltaTableName VALUES (4), (5)")
testBody(deltaTableName)
}
}

/** Short-hand for the type widening metadata for column `value` for the test table above. */
private val typeWideningMetadata: Metadata =
new MetadataBuilder()
.putMetadataArray(
"delta.typeChanges", Array(
new MetadataBuilder()
.putLong("tableVersion", 2)
.putString("fromType", "short")
.putString("toType", "integer")
.build()))
.build()

test(s"Delta sharing with type widening") {
withTestTable { tableName =>
testReadingDeltaShare(
tableName,
versionAsOf = None,
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedResult = Seq(1, 2, 3, Int.MaxValue, 4, 5).toDF("value"))
}
}

test("Delta sharing with type widening, time travel") {
withTestTable { tableName =>
testReadingDeltaShare(
tableName,
versionAsOf = Some(3),
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedResult = Seq(1, 2, 3, Int.MaxValue).toDF("value"))

testReadingDeltaShare(
tableName,
versionAsOf = Some(2),
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedResult = Seq(1, 2).toDF("value"))

testReadingDeltaShare(
tableName,
versionAsOf = Some(1),
expectedSchema = new StructType()
.add("value", ShortType),
expectedResult = Seq(1, 2).toDF("value"))
}
}

test("jsonPredicateHints on non-partition column after type widening") {
withTestTable { tableName =>
testReadingDeltaShare(
tableName,
versionAsOf = None,
filter = Some(col("value") === Int.MaxValue),
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedResult = Seq(Int.MaxValue).toDF("value"),
expectedJsonPredicate = Seq(
"""
|{"op":"and","children":[
| {"op":"not","children":[
| {"op":"isNull","children":[
| {"op":"column","name":"value","valueType":"int"}]}]},
| {"op":"equal","children":[
| {"op":"column","name":"value","valueType":"int"},
| {"op":"literal","value":"2147483647","valueType":"int"}]}]}
""".stripMargin.replaceAll("\n", "").replaceAll(" ", ""))
)
}
}

test("jsonPredicateHints on partition column after type widening") {
val deltaTableName = "type_widening_partitioned"
withTable(deltaTableName) {
sql(
s"""
|CREATE TABLE $deltaTableName (part SMALLINT, value SMALLINT)
|USING DELTA
|PARTITIONED BY (part)
""".stripMargin
)
sql(s"INSERT INTO $deltaTableName VALUES (1, 1), (2, 2)")
sql(s"ALTER TABLE $deltaTableName CHANGE COLUMN part TYPE INT")
sql(s"INSERT INTO $deltaTableName VALUES (3, 3), (${Int.MaxValue}, 4)")

testReadingDeltaShare(
deltaTableName,
versionAsOf = None,
filter = Some(col("part") === Int.MaxValue),
expectedSchema = new StructType()
.add("part", IntegerType, nullable = true, metadata = typeWideningMetadata)
.add("value", ShortType),
expectedResult = Seq((Int.MaxValue, 4)).toDF("part", "value"),
expectedJsonPredicate = Seq(
"""
|{"op":"and","children":[
| {"op":"not","children":[
| {"op":"isNull","children":[
| {"op":"column","name":"part","valueType":"int"}]}]},
| {"op":"equal","children":[
| {"op":"column","name":"part","valueType":"int"},
| {"op":"literal","value":"2147483647","valueType":"int"}]}]}
""".stripMargin.replaceAll("\n", "").replaceAll(" ", ""))
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.delta.sharing.spark

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.util.JsonUtils
import io.delta.sharing.client.{
DeltaSharingClient,
Expand Down Expand Up @@ -60,16 +61,20 @@ private[spark] class TestClientForDeltaFormatSharing(
tokenRenewalThresholdInSeconds: Int = 600)
extends DeltaSharingClient {

private val supportedReaderFeatures: Seq[String] = Seq(
DeletionVectorsTableFeature,
ColumnMappingTableFeature,
TimestampNTZTableFeature,
TypeWideningPreviewTableFeature,
TypeWideningTableFeature,
VariantTypeTableFeature
).map(_.name)

assert(
responseFormat == DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET ||
(
readerFeatures.contains("deletionVectors") &&
readerFeatures.contains("columnMapping") &&
readerFeatures.contains("timestampNtz") &&
readerFeatures.contains("variantType-preview")
),
"deletionVectors, columnMapping, timestampNtz, variantType-preview should be supported in " +
"all types of queries."
supportedReaderFeatures.forall(readerFeatures.split(",").contains),
s"${supportedReaderFeatures.diff(readerFeatures.split(",")).mkString(", ")} " +
s"should be supported in all types of queries."
)

import TestClientForDeltaFormatSharing._
Expand Down

0 comments on commit ed6ae70

Please sign in to comment.