Skip to content

Commit

Permalink
Adds a v1 fallback writer implementation for v2 data source codepaths
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Aug 3, 2019
1 parent b8e13b0 commit d5798fd
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
catalog match {
case staging: StagingTableCatalog =>
AtomicCreateTableAsSelectExec(
staging, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil
staging, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
case _ =>
CreateTableAsSelectExec(
catalog, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil
catalog, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
}

case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
Expand All @@ -191,6 +191,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
staging,
ident,
parts,
query,
planLater(query),
props,
writeOptions,
Expand All @@ -200,14 +201,15 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
catalog,
ident,
parts,
query,
planLater(query),
props,
writeOptions,
orCreate = orCreate) :: Nil
}

case AppendData(r: DataSourceV2Relation, query, _) =>
AppendDataExec(r.table.asWritable, r.options, planLater(query)) :: Nil
AppendDataExec(r.table.asWritable, r.options, query, planLater(query)) :: Nil

case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, _) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
Expand All @@ -217,7 +219,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
}.toArray

OverwriteByExpressionExec(
r.table.asWritable, filters, r.options, planLater(query)) :: Nil
r.table.asWritable, filters, r.options, query, planLater(query)) :: Nil

case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) =>
OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SaveMode}
import org.apache.spark.sql.catalog.v2.{Identifier, StagingTableCatalog, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
import org.apache.spark.sql.sources.{AlwaysTrue, CreatableRelationProvider, Filter}
import org.apache.spark.sql.sources.v2.{StagedTable, SupportsWrite}
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{LongAccumulator, Utils}

Expand Down Expand Up @@ -63,10 +64,11 @@ case class CreateTableAsSelectExec(
catalog: TableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
ifNotExists: Boolean) extends V2TableWriteExec {
ifNotExists: Boolean) extends SupportsV1Write {

import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper

Expand All @@ -83,12 +85,17 @@ case class CreateTableAsSelectExec(
catalog.createTable(
ident, query.schema, partitioning.toArray, properties.asJava) match {
case table: SupportsWrite =>
val batchWrite = table.newWriteBuilder(writeOptions)
val writer = table.newWriteBuilder(writeOptions)
.withInputDataSchema(query.schema)
.withQueryId(UUID.randomUUID().toString)
.buildForBatch()

doWrite(batchWrite)
writer match {
case v1: V1WriteBuilder =>
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
writeWithV1(v1.buildForV1Write(), mode, writeOptions)
case v2 =>
doWrite(v2.buildForBatch())
}

case _ =>
// table does not support writes
Expand All @@ -114,6 +121,7 @@ case class AtomicCreateTableAsSelectExec(
catalog: StagingTableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
Expand All @@ -129,7 +137,8 @@ case class AtomicCreateTableAsSelectExec(
}
val stagedTable = catalog.stageCreate(
ident, query.schema, partitioning.toArray, properties.asJava)
writeToStagedTable(stagedTable, writeOptions, ident)
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
writeToStagedTable(stagedTable, writeOptions, ident, mode)
}
}

Expand All @@ -147,10 +156,11 @@ case class ReplaceTableAsSelectExec(
catalog: TableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
orCreate: Boolean) extends AtomicTableWriteExec {
orCreate: Boolean) extends SupportsV1Write {

import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper

Expand All @@ -173,12 +183,16 @@ case class ReplaceTableAsSelectExec(
Utils.tryWithSafeFinallyAndFailureCallbacks({
createdTable match {
case table: SupportsWrite =>
val batchWrite = table.newWriteBuilder(writeOptions)
val writer = table.newWriteBuilder(writeOptions)
.withInputDataSchema(query.schema)
.withQueryId(UUID.randomUUID().toString)
.buildForBatch()

doWrite(batchWrite)
writer match {
case v1: V1WriteBuilder =>
writeWithV1(v1.buildForV1Write(), SaveMode.Overwrite, writeOptions)
case v2 =>
doWrite(v2.buildForBatch())
}

case _ =>
// table does not support writes
Expand Down Expand Up @@ -207,6 +221,7 @@ case class AtomicReplaceTableAsSelectExec(
catalog: StagingTableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
plan: LogicalPlan,
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
Expand All @@ -227,7 +242,7 @@ case class AtomicReplaceTableAsSelectExec(
} else {
throw new CannotReplaceMissingTableException(ident)
}
writeToStagedTable(staged, writeOptions, ident)
writeToStagedTable(staged, writeOptions, ident, SaveMode.Overwrite)
}
}

Expand All @@ -239,11 +254,16 @@ case class AtomicReplaceTableAsSelectExec(
case class AppendDataExec(
table: SupportsWrite,
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
plan: LogicalPlan,
query: SparkPlan) extends SupportsV1Write with BatchWriteHelper {

override protected def doExecute(): RDD[InternalRow] = {
val batchWrite = newWriteBuilder().buildForBatch()
doWrite(batchWrite)
newWriteBuilder() match {
case v1: V1WriteBuilder =>
writeWithV1(v1.buildForV1Write(), SaveMode.Append, writeOptions)
case v2 =>
doWrite(v2.buildForBatch())
}
}
}

Expand All @@ -261,25 +281,26 @@ case class OverwriteByExpressionExec(
table: SupportsWrite,
deleteWhere: Array[Filter],
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
plan: LogicalPlan,
query: SparkPlan) extends SupportsV1Write with BatchWriteHelper {

private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
}

override protected def doExecute(): RDD[InternalRow] = {
val batchWrite = newWriteBuilder() match {
newWriteBuilder() match {
case v1: V1WriteBuilder if isTruncate(deleteWhere) =>
writeWithV1(v1.buildForV1Write(), SaveMode.Overwrite, writeOptions)
case builder: SupportsTruncate if isTruncate(deleteWhere) =>
builder.truncate().buildForBatch()
doWrite(builder.truncate().buildForBatch())

case builder: SupportsOverwrite =>
builder.overwrite(deleteWhere).buildForBatch()
doWrite(builder.overwrite(deleteWhere).buildForBatch())

case _ =>
throw new SparkException(s"Table does not support overwrite by expression: $table")
}

doWrite(batchWrite)
}
}

Expand Down Expand Up @@ -463,24 +484,30 @@ object DataWritingSparkTask extends Logging {
}
}

private[v2] trait AtomicTableWriteExec extends V2TableWriteExec {
private[v2] trait AtomicTableWriteExec extends SupportsV1Write {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper

protected def writeToStagedTable(
stagedTable: StagedTable,
writeOptions: CaseInsensitiveStringMap,
ident: Identifier): RDD[InternalRow] = {
ident: Identifier,
mode: SaveMode): RDD[InternalRow] = {
Utils.tryWithSafeFinallyAndFailureCallbacks({
stagedTable match {
case table: SupportsWrite =>
val batchWrite = table.newWriteBuilder(writeOptions)
val writer = table.newWriteBuilder(writeOptions)
.withInputDataSchema(query.schema)
.withQueryId(UUID.randomUUID().toString)
.buildForBatch()

val writtenRows = doWrite(batchWrite)
val writtenRows = writer match {
case v1: V1WriteBuilder =>
writeWithV1(v1.buildForV1Write(), SaveMode.Overwrite, writeOptions)
case v2 =>
doWrite(v2.buildForBatch())
}
stagedTable.commitStagedChanges()
writtenRows

case _ =>
// Table does not support writes - staged changes are also rolled back below.
throw new SparkException(
Expand All @@ -501,3 +528,19 @@ private[v2] case class DataWritingSparkTaskResult(
* Sink progress information collected after commit.
*/
private[sql] case class StreamWriterCommitProgress(numOutputRows: Long)

/**
* A trait that allows Tables that use V1 Writer interfaces to write data.
*/
private trait SupportsV1Write extends V2TableWriteExec {
def plan: LogicalPlan

protected def writeWithV1(
relation: CreatableRelationProvider,
mode: SaveMode,
options: CaseInsensitiveStringMap): RDD[InternalRow] = {
relation.createRelation(
sqlContext, mode, options.asScala.toMap, Dataset.ofRows(sqlContext.sparkSession, plan))
sparkContext.emptyRDD
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.writer

import org.apache.spark.annotation.{Experimental, Unstable}
import org.apache.spark.sql.sources.CreatableRelationProvider

/**
* A trait that should be implemented by V1 DataSources that would like to leverage the DataSource
* V2 write code paths.
*
* @since 3.0.0
*/
@Experimental
@Unstable
trait V1WriteBuilder extends WriteBuilder {

/**
* Creates a [[CreatableRelationProvider]] that allows saving a DataFrame to a
* a destination (using data source-specific parameters).
*
* The relation will receive a string to string map of options that will be case sensitive,
* therefore the implementation of the data source should be able to handle case insensitive
* option checking.
*
* @since 3.0.0
*/
def buildForV1Write(): CreatableRelationProvider = {
throw new UnsupportedOperationException(getClass.getName + " does not support batch write")
}
}

0 comments on commit d5798fd

Please sign in to comment.