Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29618] remove V1_BATCH_WRITE table capability #26281

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,5 @@ public enum TableCapability {
/**
* Signals that the table accepts input of any schema in a write operation.
*/
ACCEPT_ANY_SCHEMA,

/**
* Signals that the table supports append writes using the V1 InsertableRelation interface.
* <p>
* Tables that return this capability must create a V1WriteBuilder and may also support additional
* write modes, like {@link #TRUNCATE}, and {@link #OVERWRITE_BY_FILTER}, but cannot support
* {@link #OVERWRITE_DYNAMIC}.
*/
V1_BATCH_WRITE
ACCEPT_ANY_SCHEMA
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@

package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables}
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables}
import org.apache.spark.sql.connector.catalog.StagingTableCatalog
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources
import org.apache.spark.sql.util.CaseInsensitiveStringMap

object DataSourceV2Strategy extends Strategy with PredicateHelper {

Expand Down Expand Up @@ -183,14 +181,13 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
CreateTableExec(catalog, ident, schema, parts, props, ifNotExists) :: Nil

case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) =>
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
AtomicCreateTableAsSelectExec(
staging, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
staging, ident, parts, query, planLater(query), props, options, ifNotExists) :: Nil
case _ =>
CreateTableAsSelectExec(
catalog, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
catalog, ident, parts, query, planLater(query), props, options, ifNotExists) :: Nil
}

case RefreshTable(catalog, ident) =>
Expand All @@ -205,7 +202,6 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
}

case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableAsSelectExec(
Expand All @@ -215,7 +211,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
query,
planLater(query),
props,
writeOptions,
options,
orCreate = orCreate) :: Nil
case _ =>
ReplaceTableAsSelectExec(
Expand All @@ -225,35 +221,10 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
query,
planLater(query),
props,
writeOptions,
options,
orCreate = orCreate) :: Nil
}

case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
AppendDataExecV1(v1, writeOptions.asOptions, query) :: Nil
case v2 =>
AppendDataExec(v2, writeOptions.asOptions, planLater(query)) :: Nil
}

case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = splitConjunctivePredicates(deleteExpr).map {
filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse(
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
}.toArray
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, query) :: Nil
case v2 =>
OverwriteByExpressionExec(v2, filters, writeOptions.asOptions, planLater(query)) :: Nil
}

case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) =>
OverwritePartitionsDynamicExec(
r.table.asWritable, writeOptions.asOptions, planLater(query)) :: Nil

case DeleteFromTable(r: DataSourceV2Relation, condition) =>
if (condition.exists(SubqueryExpression.hasSubquery)) {
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
import org.apache.spark.sql.types.BooleanType
Expand All @@ -33,10 +32,6 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {

private def failAnalysis(msg: String): Unit = throw new AnalysisException(msg)

private def supportsBatchWrite(table: Table): Boolean = {
table.supportsAny(BATCH_WRITE, V1_BATCH_WRITE)
}

override def apply(plan: LogicalPlan): Unit = {
plan foreach {
case r: DataSourceV2Relation if !r.table.supports(BATCH_READ) =>
Expand All @@ -48,7 +43,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {

// TODO: check STREAMING_WRITE capability. It's not doable now because we don't have a
// a logical plan for streaming write.
case AppendData(r: DataSourceV2Relation, _, _, _) if !supportsBatchWrite(r.table) =>
case AppendData(r: DataSourceV2Relation, _, _, _) if !r.table.supports(BATCH_WRITE) =>
failAnalysis(s"Table ${r.table.name()} does not support append in batch mode.")

case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _)
Expand All @@ -58,13 +53,13 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
case OverwriteByExpression(r: DataSourceV2Relation, expr, _, _, _) =>
expr match {
case Literal(true, BooleanType) =>
if (!supportsBatchWrite(r.table) ||
if (!r.table.supports(BATCH_WRITE) ||
!r.table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER)) {
failAnalysis(
s"Table ${r.table.name()} does not support truncate in batch mode.")
}
case _ =>
if (!supportsBatchWrite(r.table) || !r.table.supports(OVERWRITE_BY_FILTER)) {
if (!r.table.supports(BATCH_WRITE) || !r.table.supports(OVERWRITE_BY_FILTER)) {
failAnalysis(s"Table ${r.table.name()} does not support " +
"overwrite by filter in batch mode.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,27 @@

package org.apache.spark.sql.execution.datasources.v2

import java.util.UUID

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.write.{SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
import org.apache.spark.sql.sources.{Filter, InsertableRelation}

/**
* Physical plan node for append into a v2 table using V1 write interfaces.
*
* Rows in the output data set are appended.
*/
case class AppendDataExecV1(
table: SupportsWrite,
writeOptions: CaseInsensitiveStringMap,
plan: LogicalPlan) extends V1FallbackWriters {
v1Relation: InsertableRelation,
plan: LogicalPlan) extends LeafExecNode with SupportsV1Write {

override def output: Seq[Attribute] = Nil

override protected def doExecute(): RDD[InternalRow] = {
writeWithV1(newWriteBuilder().buildForV1Write())
writeWithV1(v1Relation)
}
}

Expand All @@ -58,50 +53,14 @@ case class AppendDataExecV1(
* AlwaysTrue to delete all rows.
*/
case class OverwriteByExpressionExecV1(
table: SupportsWrite,
deleteWhere: Array[Filter],
writeOptions: CaseInsensitiveStringMap,
plan: LogicalPlan) extends V1FallbackWriters {

private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
}

override protected def doExecute(): RDD[InternalRow] = {
newWriteBuilder() match {
case builder: SupportsTruncate if isTruncate(deleteWhere) =>
writeWithV1(builder.truncate().asV1Builder.buildForV1Write())

case builder: SupportsOverwrite =>
writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write())
v1Relation: InsertableRelation,
deleteWhere: Seq[Filter],
plan: LogicalPlan) extends LeafExecNode with SupportsV1Write {

case _ =>
throw new SparkException(s"Table does not support overwrite by expression: $table")
}
}
}

/** Some helper interfaces that use V2 write semantics through the V1 writer interface. */
sealed trait V1FallbackWriters extends SupportsV1Write {
override def output: Seq[Attribute] = Nil
override final def children: Seq[SparkPlan] = Nil

def table: SupportsWrite
def writeOptions: CaseInsensitiveStringMap

protected implicit class toV1WriteBuilder(builder: WriteBuilder) {
def asV1Builder: V1WriteBuilder = builder match {
case v1: V1WriteBuilder => v1
case other => throw new IllegalStateException(
s"The returned writer ${other} was no longer a V1WriteBuilder.")
}
}

protected def newWriteBuilder(): V1WriteBuilder = {
val writeBuilder = table.newWriteBuilder(writeOptions)
.withInputDataSchema(plan.schema)
.withQueryId(UUID.randomUUID().toString)
writeBuilder.asV1Builder
override protected def doExecute(): RDD[InternalRow] = {
writeWithV1(v1Relation)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import java.util.UUID

import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
import org.apache.spark.sql.types.StructType

object V2WriteStrategy extends Strategy with PredicateHelper {
import DataSourceV2Implicits._

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
val writeBuilder = newWriteBuilder(r.table, writeOptions, query.schema)
writeBuilder match {
case v1: V1WriteBuilder =>
AppendDataExecV1(v1.buildForV1Write(), query) :: Nil
case _ =>
AppendDataExec(writeBuilder.buildForBatch(), planLater(query)) :: Nil
}

case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = splitConjunctivePredicates(deleteExpr).map {
filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse(
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
}.toArray

val writeBuilder = newWriteBuilder(r.table, writeOptions, query.schema)
val configured = writeBuilder match {
case builder: SupportsTruncate if isTruncate(filters) => builder.truncate()
case builder: SupportsOverwrite => builder.overwrite(filters)
case _ =>
throw new IllegalArgumentException(
s"Table does not support overwrite by expression: ${r.table.name}")
}

configured match {
case v1: V1WriteBuilder =>
OverwriteByExpressionExecV1(v1.buildForV1Write(), filters, query) :: Nil
case _ =>
OverwriteByExpressionExec(configured.buildForBatch(), filters, planLater(query)) :: Nil
}

case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) =>
val writeBuilder = newWriteBuilder(r.table, writeOptions, query.schema)
val configured = writeBuilder match {
case builder: SupportsDynamicOverwrite =>
builder.overwriteDynamicPartitions()
case _ =>
throw new IllegalArgumentException(
s"Table does not support dynamic partition overwrite: ${r.table.name}")
}
OverwritePartitionsDynamicExec(configured.buildForBatch(), planLater(query)) :: Nil
}

def newWriteBuilder(
table: Table,
options: Map[String, String],
inputSchema: StructType): WriteBuilder = {
table.asWritable.newWriteBuilder(options.asOptions)
.withInputDataSchema(inputSchema)
.withQueryId(UUID.randomUUID().toString)
}

private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
}
}
Loading