diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java index 2c55c73606413..5225b12788c49 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsCatalogOptions.java @@ -48,6 +48,6 @@ public interface SupportsCatalogOptions extends TableProvider { * topic name, etc. It's an immutable case-insensitive string-to-string map. */ default String extractCatalog(CaseInsensitiveStringMap options) { - return null; + return CatalogManager.SESSION_CATALOG_NAME(); } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index c67104384be59..30d0c851964d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -216,6 +216,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) val table = provider match { + case _: SupportsCatalogOptions if userSpecifiedSchema.nonEmpty => + throw new IllegalArgumentException( + s"$source does not support user specified schema. Please don't specify the schema.") case hasCatalog: SupportsCatalogOptions => val ident = hasCatalog.extractIdentifier(dsOptions) val catalog = CatalogV2Util.getTableProviderCatalog( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 3dda3f986355a..998ec9ebdff85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Util, Identifier, SupportsCatalogOptions, SupportsWrite, Table, TableCatalog, TableProvider, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, SupportsWrite, Table, TableCatalog, TableProvider, V1Table} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, LiteralValue, Transform} import org.apache.spark.sql.execution.SQLExecution @@ -661,9 +661,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val partitioning = partitioningColumns.map { colNames => colNames.map(name => IdentityTransform(FieldReference(name))) }.getOrElse(Seq.empty[Transform]) - val bucketing = bucketColumnNames.map { cols => - Seq(BucketTransform(LiteralValue(numBuckets.get, IntegerType), cols.map(FieldReference(_)))) - }.getOrElse(Seq.empty[Transform]) + val bucketing = + getBucketSpec.map(spec => CatalogV2Implicits.BucketSpecHelper(spec).asTransform).toSeq partitioning ++ bucketing } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala index 7fd4cc113aa65..0148bb07ee967 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala @@ -179,6 +179,23 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with checkAnswer(load("t1", Some(catalogName)), df2.toDF()) } + test("fail on user specified schema when reading - session catalog") { + sql(s"create table t1 (id bigint) using $format") + val e = intercept[IllegalArgumentException] { + spark.read.format(format).option("name", "t1").schema("id bigint").load() + } + assert(e.getMessage.contains("not support user specified schema")) + } + + test("fail on user specified schema when reading - testcat catalog") { + sql(s"create table $catalogName.t1 (id bigint) using $format") + val e = intercept[IllegalArgumentException] { + spark.read.format(format).option("name", "t1").option("catalog", catalogName) + .schema("id bigint").load() + } + assert(e.getMessage.contains("not support user specified schema")) + } + private def load(name: String, catalogOpt: Option[String]): DataFrame = { val dfr = spark.read.format(format).option("name", "t1") catalogOpt.foreach(cName => dfr.option("catalog", cName))