From 5d6124220b7900bb4e646ccf85eae5f1de1b5887 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 18 Jul 2023 00:44:23 -0700 Subject: [PATCH] rdar://112431669: Auto load Iceberg extensions (#1805) * Auto load IcebergSparkExtensions * Add test * Review comments * Temp: upgrade to 1.3.0.1-apple to try to pass tests * Fix tests Co-authored-by: Szehon Ho --- .../apache/spark/sql/internal/SQLConf.scala | 9 +++++++ .../org/apache/spark/sql/SparkSession.scala | 21 ++++++++++++++- .../sql/SparkSessionExtensionSuite.scala | 26 ++++++++++++++++--- .../sql/execution/SQLViewTestSuite.scala | 4 +-- 4 files changed, 54 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 532e563e26c4e..743c53c7f62be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4206,6 +4206,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ICEBERG_ENABLED = + buildConf("spark.sql.extensions.iceberg.enabled") + .internal() + .doc("Whether to automatically load org.apache.iceberg.spark.extensions" + + ".IcebergSparkSessionExtensions by default.") + .version("3.4.0") + .booleanConf + .createWithDefault(true) + /** * Holds information about keys that have been deprecated. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index b6781f6d98702..2c2e7179cfba4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.collection.JavaConverters._ import scala.reflect.runtime.universe.TypeTag +import scala.util.Try import scala.util.control.NonFatal import com.apple.boson.BosonConf @@ -839,6 +840,8 @@ class SparkSession private( @Stable object SparkSession extends Logging { + def icebergClass: String = "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" + /** * Builder for [[SparkSession]]. */ @@ -1312,6 +1315,14 @@ object SparkSession extends Logging { } } + private def loadIcebergExtension(sparkContext: SparkContext): Seq[String] = { + if (sparkContext.getConf.getBoolean(SQLConf.ICEBERG_ENABLED.key, isIcebergEnabled)) { + Seq(icebergClass) + } else { + Seq.empty + } + } + /** * Initialize extensions specified in [[StaticSQLConf]]. The classes will be applied to the * extensions passed into this function. @@ -1320,7 +1331,8 @@ object SparkSession extends Logging { sparkContext: SparkContext, extensions: SparkSessionExtensions): SparkSessionExtensions = { val extensionConfClassNames = sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS) - .getOrElse(Seq.empty) ++ loadBosonExtension(sparkContext) + .getOrElse(Seq.empty) ++ loadBosonExtension(sparkContext) ++ + loadIcebergExtension(sparkContext) extensionConfClassNames.foreach { extensionConfClassName => try { val extensionConfClass = Utils.classForName(extensionConfClassName) @@ -1363,4 +1375,11 @@ object SparkSession extends Logging { val v = System.getenv("BOSON") v == null || v.toBoolean } + + /** + * Whether Iceberg extension is enabled + */ + def isIcebergEnabled: Boolean = { + Try(Utils.classForName(icebergClass, false)).isSuccess + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 1d3efcb86bc0d..1db09fb85a7ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -39,10 +39,11 @@ import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec, WriteFilesSpec} +import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin} import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.COLUMN_BATCH_SIZE +import org.apache.spark.sql.internal.SQLConf.{COLUMN_BATCH_SIZE, ICEBERG_ENABLED} import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS import org.apache.spark.sql.types.{DataType, Decimal, IntegerType, LongType, Metadata, StructType} import org.apache.spark.sql.vectorized.{ColumnarArray, ColumnarBatch, ColumnarMap, ColumnVector} @@ -62,15 +63,31 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper { } private def withSession( - builders: Seq[SparkSessionExtensionsProvider])(f: SparkSession => Unit): Unit = { + builders: Seq[SparkSessionExtensionsProvider], pairs: (String, String)*) + (f: SparkSession => Unit): Unit = { val builder = SparkSession.builder().master("local[1]") builders.foreach(builder.withExtensions) - val spark = builder.getOrCreate() + val configuredBuilder = + if (!SQLConf.get.contains(SQLConf.ICEBERG_ENABLED.key)) { + builder.config(SQLConf.ICEBERG_ENABLED.key, "false") + } else { + builder + } + val spark = configuredBuilder.getOrCreate() try f(spark) finally { stop(spark) } } + test("Test Iceberg extension") { + withSQLConf(SQLConf.ICEBERG_ENABLED.key -> "true") { + withSession(Seq()) { session => + assert(session.sessionState.planner.strategies.contains( + ExtendedDataSourceV2Strategy(session))) + } + } + } + test("inject analyzer rule") { withSession(Seq(_.injectResolutionRule(MyRule))) { session => assert(session.sessionState.analyzer.extendedResolutionRules.contains(MyRule(session))) @@ -341,6 +358,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper { val session = SparkSession.builder() .master("local[1]") .config(SPARK_SESSION_EXTENSIONS.key, classOf[MyExtensions].getCanonicalName) + .config(ICEBERG_ENABLED.key, false) .getOrCreate() try { assert(session.sessionState.planner.strategies.contains(MySparkStrategy(session))) @@ -364,6 +382,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper { .config(SPARK_SESSION_EXTENSIONS.key, Seq( classOf[MyExtensions2].getCanonicalName, classOf[MyExtensions].getCanonicalName).mkString(",")) + .config(ICEBERG_ENABLED.key, false) .getOrCreate() try { assert(session.sessionState.planner.strategies.containsSlice( @@ -392,6 +411,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper { .config(SPARK_SESSION_EXTENSIONS.key, Seq( classOf[MyExtensions].getCanonicalName, classOf[MyExtensions].getCanonicalName).mkString(",")) + .config(ICEBERG_ENABLED.key, false) .getOrCreate() try { assert(session.sessionState.planner.strategies.count(_ === MySparkStrategy(session)) === 2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index 592f1c2607da8..680a5e1bd36cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -663,8 +663,8 @@ class PersistedViewTestSuite extends SQLViewTestSuite with SharedSparkSession { val message = intercept[AnalysisException] { sql("SELECT * FROM v") }.getMessage - assert(message.contains(s"Invalid view text: $dropView." + - s" The view ${table.qualifiedName} may have been tampered with")) + assert(message.contains(s"has an incompatible schema change " + + s"and column 1 cannot be resolved")) } }