Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Jan 9, 2020
1 parent 12f4ce4 commit 963133e
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ public interface SupportsCatalogOptions extends TableProvider {
* topic name, etc. It's an immutable case-insensitive string-to-string map.
*/
default String extractCatalog(CaseInsensitiveStringMap options) {
return null;
return CatalogManager.SESSION_CATALOG_NAME();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
val table = provider match {
case _: SupportsCatalogOptions if userSpecifiedSchema.nonEmpty =>
throw new IllegalArgumentException(
s"$source does not support user specified schema. Please don't specify the schema.")
case hasCatalog: SupportsCatalogOptions =>
val ident = hasCatalog.extractIdentifier(dsOptions)
val catalog = CatalogV2Util.getTableProviderCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Util, Identifier, SupportsCatalogOptions, SupportsWrite, Table, TableCatalog, TableProvider, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, SupportsWrite, Table, TableCatalog, TableProvider, V1Table}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, LiteralValue, Transform}
import org.apache.spark.sql.execution.SQLExecution
Expand Down Expand Up @@ -661,9 +661,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val partitioning = partitioningColumns.map { colNames =>
colNames.map(name => IdentityTransform(FieldReference(name)))
}.getOrElse(Seq.empty[Transform])
val bucketing = bucketColumnNames.map { cols =>
Seq(BucketTransform(LiteralValue(numBuckets.get, IntegerType), cols.map(FieldReference(_))))
}.getOrElse(Seq.empty[Transform])
val bucketing =
getBucketSpec.map(spec => CatalogV2Implicits.BucketSpecHelper(spec).asTransform).toSeq
partitioning ++ bucketing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,23 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
checkAnswer(load("t1", Some(catalogName)), df2.toDF())
}

test("fail on user specified schema when reading - session catalog") {
sql(s"create table t1 (id bigint) using $format")
val e = intercept[IllegalArgumentException] {
spark.read.format(format).option("name", "t1").schema("id bigint").load()
}
assert(e.getMessage.contains("not support user specified schema"))
}

test("fail on user specified schema when reading - testcat catalog") {
sql(s"create table $catalogName.t1 (id bigint) using $format")
val e = intercept[IllegalArgumentException] {
spark.read.format(format).option("name", "t1").option("catalog", catalogName)
.schema("id bigint").load()
}
assert(e.getMessage.contains("not support user specified schema"))
}

private def load(name: String, catalogOpt: Option[String]): DataFrame = {
val dfr = spark.read.format(format).option("name", "t1")
catalogOpt.foreach(cName => dfr.option("catalog", cName))
Expand Down

0 comments on commit 963133e

Please sign in to comment.