Skip to content

Commit

Permalink
remove V1_BATCH_WRITE table capability
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Oct 28, 2019
1 parent 50cf484 commit b973fbd
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,5 @@ public enum TableCapability {
/**
* Signals that the table accepts input of any schema in a write operation.
*/
ACCEPT_ANY_SCHEMA,

/**
* Signals that the table supports append writes using the V1 InsertableRelation interface.
* <p>
* Tables that return this capability must create a V1WriteBuilder and may also support additional
* write modes, like {@link #TRUNCATE}, and {@link #OVERWRITE_BY_FILTER}, but cannot support
* {@link #OVERWRITE_DYNAMIC}.
*/
V1_BATCH_WRITE
ACCEPT_ANY_SCHEMA
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@

package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables}
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables}
import org.apache.spark.sql.connector.catalog.StagingTableCatalog
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources
import org.apache.spark.sql.util.CaseInsensitiveStringMap

object DataSourceV2Strategy extends Strategy with PredicateHelper {

Expand Down Expand Up @@ -183,14 +181,13 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
CreateTableExec(catalog, ident, schema, parts, props, ifNotExists) :: Nil

case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) =>
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
AtomicCreateTableAsSelectExec(
staging, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
staging, ident, parts, query, planLater(query), props, options, ifNotExists) :: Nil
case _ =>
CreateTableAsSelectExec(
catalog, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
catalog, ident, parts, query, planLater(query), props, options, ifNotExists) :: Nil
}

case RefreshTable(catalog, ident) =>
Expand All @@ -205,7 +202,6 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
}

case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableAsSelectExec(
Expand All @@ -215,7 +211,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
query,
planLater(query),
props,
writeOptions,
options,
orCreate = orCreate) :: Nil
case _ =>
ReplaceTableAsSelectExec(
Expand All @@ -225,35 +221,10 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
query,
planLater(query),
props,
writeOptions,
options,
orCreate = orCreate) :: Nil
}

case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
AppendDataExecV1(v1, writeOptions.asOptions, query) :: Nil
case v2 =>
AppendDataExec(v2, writeOptions.asOptions, planLater(query)) :: Nil
}

case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = splitConjunctivePredicates(deleteExpr).map {
filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse(
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
}.toArray
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, query) :: Nil
case v2 =>
OverwriteByExpressionExec(v2, filters, writeOptions.asOptions, planLater(query)) :: Nil
}

case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) =>
OverwritePartitionsDynamicExec(
r.table.asWritable, writeOptions.asOptions, planLater(query)) :: Nil

case DeleteFromTable(r: DataSourceV2Relation, condition) =>
if (condition.exists(SubqueryExpression.hasSubquery)) {
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
import org.apache.spark.sql.types.BooleanType
Expand All @@ -33,10 +32,6 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {

private def failAnalysis(msg: String): Unit = throw new AnalysisException(msg)

private def supportsBatchWrite(table: Table): Boolean = {
table.supportsAny(BATCH_WRITE, V1_BATCH_WRITE)
}

override def apply(plan: LogicalPlan): Unit = {
plan foreach {
case r: DataSourceV2Relation if !r.table.supports(BATCH_READ) =>
Expand All @@ -48,7 +43,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {

// TODO: check STREAMING_WRITE capability. It's not doable now because we don't have a
// a logical plan for streaming write.
case AppendData(r: DataSourceV2Relation, _, _, _) if !supportsBatchWrite(r.table) =>
case AppendData(r: DataSourceV2Relation, _, _, _) if !r.table.supports(BATCH_WRITE) =>
failAnalysis(s"Table ${r.table.name()} does not support append in batch mode.")

case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _)
Expand All @@ -58,13 +53,13 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
case OverwriteByExpression(r: DataSourceV2Relation, expr, _, _, _) =>
expr match {
case Literal(true, BooleanType) =>
if (!supportsBatchWrite(r.table) ||
if (!r.table.supports(BATCH_WRITE) ||
!r.table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER)) {
failAnalysis(
s"Table ${r.table.name()} does not support truncate in batch mode.")
}
case _ =>
if (!supportsBatchWrite(r.table) || !r.table.supports(OVERWRITE_BY_FILTER)) {
if (!r.table.supports(BATCH_WRITE) || !r.table.supports(OVERWRITE_BY_FILTER)) {
failAnalysis(s"Table ${r.table.name()} does not support " +
"overwrite by filter in batch mode.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,27 @@

package org.apache.spark.sql.execution.datasources.v2

import java.util.UUID

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.write.{SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
import org.apache.spark.sql.sources.{Filter, InsertableRelation}

/**
* Physical plan node for append into a v2 table using V1 write interfaces.
*
* Rows in the output data set are appended.
*/
case class AppendDataExecV1(
table: SupportsWrite,
writeOptions: CaseInsensitiveStringMap,
plan: LogicalPlan) extends V1FallbackWriters {
v1Relation: InsertableRelation,
plan: LogicalPlan) extends LeafExecNode with SupportsV1Write {

override def output: Seq[Attribute] = Nil

override protected def doExecute(): RDD[InternalRow] = {
writeWithV1(newWriteBuilder().buildForV1Write())
writeWithV1(v1Relation)
}
}

Expand All @@ -58,57 +53,22 @@ case class AppendDataExecV1(
* AlwaysTrue to delete all rows.
*/
case class OverwriteByExpressionExecV1(
table: SupportsWrite,
deleteWhere: Array[Filter],
writeOptions: CaseInsensitiveStringMap,
plan: LogicalPlan) extends V1FallbackWriters {

private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
}

override protected def doExecute(): RDD[InternalRow] = {
newWriteBuilder() match {
case builder: SupportsTruncate if isTruncate(deleteWhere) =>
writeWithV1(builder.truncate().asV1Builder.buildForV1Write())

case builder: SupportsOverwrite =>
writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write())
v1Relation: InsertableRelation,
deleteWhere: Seq[Filter],
plan: LogicalPlan) extends LeafExecNode with SupportsV1Write {

case _ =>
throw new SparkException(s"Table does not support overwrite by expression: $table")
}
}
}

/** Some helper interfaces that use V2 write semantics through the V1 writer interface. */
sealed trait V1FallbackWriters extends SupportsV1Write {
override def output: Seq[Attribute] = Nil
override final def children: Seq[SparkPlan] = Nil

def table: SupportsWrite
def writeOptions: CaseInsensitiveStringMap

protected implicit class toV1WriteBuilder(builder: WriteBuilder) {
def asV1Builder: V1WriteBuilder = builder match {
case v1: V1WriteBuilder => v1
case other => throw new IllegalStateException(
s"The returned writer ${other} was no longer a V1WriteBuilder.")
}
}

protected def newWriteBuilder(): V1WriteBuilder = {
val writeBuilder = table.newWriteBuilder(writeOptions)
.withInputDataSchema(plan.schema)
.withQueryId(UUID.randomUUID().toString)
writeBuilder.asV1Builder
override protected def doExecute(): RDD[InternalRow] = {
writeWithV1(v1Relation)
}
}

/**
* A trait that allows Tables that use V1 Writer interfaces to append data.
*/
trait SupportsV1Write extends SparkPlan {

// TODO: We should be able to work on SparkPlans at this point.
def plan: LogicalPlan

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import java.util.UUID

import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
import org.apache.spark.sql.types.StructType

object V2WriteStrategy extends Strategy with PredicateHelper {
import DataSourceV2Implicits._

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
val writeBuilder = newWriteBuilder(r.table, writeOptions, query.schema)
writeBuilder match {
case v1: V1WriteBuilder =>
AppendDataExecV1(v1.buildForV1Write(), query) :: Nil
case _ =>
AppendDataExec(writeBuilder.buildForBatch(), planLater(query)) :: Nil
}

case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = splitConjunctivePredicates(deleteExpr).map {
filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse(
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
}.toArray

val writeBuilder = newWriteBuilder(r.table, writeOptions, query.schema)
val configured = writeBuilder match {
case builder: SupportsTruncate if isTruncate(filters) => builder.truncate()
case builder: SupportsOverwrite => builder.overwrite(filters)
case _ =>
throw new IllegalArgumentException(
s"Table does not support overwrite by expression: ${r.table.name}")
}

configured match {
case v1: V1WriteBuilder =>
OverwriteByExpressionExecV1(v1.buildForV1Write(), filters, query) :: Nil
case _ =>
OverwriteByExpressionExec(configured.buildForBatch(), filters, planLater(query)) :: Nil
}

case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) =>
val writeBuilder = newWriteBuilder(r.table, writeOptions, query.schema)
val configured = writeBuilder match {
case builder: SupportsDynamicOverwrite =>
builder.overwriteDynamicPartitions()
case _ =>
throw new IllegalArgumentException(
s"Table does not support dynamic partition overwrite: ${r.table.name}")
}
OverwritePartitionsDynamicExec(configured.buildForBatch(), planLater(query)) :: Nil
}

def newWriteBuilder(
table: Table,
options: Map[String, String],
inputSchema: StructType): WriteBuilder = {
table.asWritable.newWriteBuilder(options.asOptions)
.withInputDataSchema(inputSchema)
.withQueryId(UUID.randomUUID().toString)
}

private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
}
}
Loading

0 comments on commit b973fbd

Please sign in to comment.