Skip to content

Commit

Permalink
Added partitioning checks
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Dec 16, 2019
1 parent ed9adc8 commit 0a87228
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Catalogs, Identifier, SupportsCatalogOptions, SupportsWrite, TableCatalog, TableProvider, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Catalogs, Identifier, SupportsCatalogOptions, SupportsWrite, Table, TableCatalog, TableProvider, V1Table}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, LiteralValue, Transform}
import org.apache.spark.sql.execution.SQLExecution
Expand Down Expand Up @@ -260,18 +260,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
provider.getTable(dsOptions) match {
case table: SupportsWrite if table.supports(BATCH_WRITE) =>
if (partitioningColumns.nonEmpty) {
throw new AnalysisException("Cannot write data to TableProvider implementation " +
"if partition columns are specified.")
}
lazy val relation = DataSourceV2Relation.create(table, dsOptions)
mode match {
case SaveMode.Append =>
verifyV2Partitioning(table)
runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan, extraOptions.toMap)
}

case SaveMode.Overwrite if table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER) =>
verifyV2Partitioning(table)
// truncate the table
runCommand(df.sparkSession, "save") {
OverwriteByExpression.byName(
Expand Down Expand Up @@ -540,6 +538,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
return saveAsTable(TableIdentifier(ident.name(), ident.namespace().headOption))

case (SaveMode.Append, Some(table)) =>
verifyV2Partitioning(table)
AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan, extraOptions.toMap)

case (SaveMode.Overwrite, _) =>
Expand Down Expand Up @@ -637,6 +636,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
CreateTable(tableDesc, mode, Some(df.logicalPlan)))
}

/** Converts the provided partitioning and bucketing information to DataSourceV2 Transforms. */
private def getV2Transforms: Seq[Transform] = {
val partitioning = partitioningColumns.map { colNames =>
colNames.map(name => IdentityTransform(FieldReference(name)))
Expand All @@ -647,6 +647,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
partitioning ++ bucketing
}

/**
* For V2 DataSources, performs if the provided partitioning matches that of the table.
* Partitioning information is not required when appending data to V2 tables.
*/
private def verifyV2Partitioning(existingTable: Table): Unit = {
val v2Partitions = getV2Transforms
if (v2Partitions.isEmpty) return
require(v2Partitions.sameElements(existingTable.partitioning()),
"The provided partitioning does not match of the table.\n" +
s" - provided: ${v2Partitions.mkString(", ")}\n" +
s" - table: ${existingTable.partitioning().mkString(", ")}")
}

/**
* Saves the content of the `DataFrame` to an external database table via JDBC. In the case the
* table already exists in the external database, behavior of this function depends on the
Expand Down

0 comments on commit 0a87228

Please sign in to comment.