Skip to content

Commit

Permalink
Add table capability to do V1_BATCH_WRITE
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Aug 20, 2019
1 parent 9bfb76e commit 00347ee
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,14 @@ public enum TableCapability {
/**
* Signals that the table accepts input of any schema in a write operation.
*/
ACCEPT_ANY_SCHEMA
ACCEPT_ANY_SCHEMA,

/**
* Signals that the table supports append writes using the V1 InsertableRelation interface.
* <p>
* Tables that return this capability must create a V1WriteBuilder and may also support additional
* write modes, like {@link #TRUNCATE}, and {@link #OVERWRITE_BY_FILTER}, but cannot support
* {@link #OVERWRITE_DYNAMIC}.
*/
V1_BATCH_WRITE
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources
import org.apache.spark.sql.sources.v2.TableCapability
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.V1WriteBuilder
Expand Down Expand Up @@ -212,15 +213,11 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
}

case AppendData(r: DataSourceV2Relation, query, _) =>
val table = r.table.asWritable
val writer = table.newWriteBuilder(r.options)
.withInputDataSchema(query.schema)
.withQueryId(UUID.randomUUID().toString)
writer match {
case v1: V1WriteBuilder =>
AppendDataExecV1(v1, query) :: Nil
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
AppendDataExecV1(v1, r.options, query) :: Nil
case v2 =>
AppendDataExec(table, v2, planLater(query)) :: Nil
AppendDataExec(v2, r.options, planLater(query)) :: Nil
}

case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, _) =>
Expand All @@ -229,15 +226,11 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse(
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
}.toArray
val table = r.table.asWritable
val writer = table.newWriteBuilder(r.options)
.withInputDataSchema(query.schema)
.withQueryId(UUID.randomUUID().toString)
writer match {
case v1: V1WriteBuilder =>
OverwriteByExpressionExecV1(table, v1, filters, query) :: Nil
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
OverwriteByExpressionExecV1(v1, filters, r.options, query) :: Nil
case v2 =>
OverwriteByExpressionExec(table, v2, filters, planLater(query)) :: Nil
OverwriteByExpressionExec(v2, filters, r.options, planLater(query)) :: Nil
}

case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources.v2

import java.util.UUID

import scala.collection.JavaConverters._

import org.apache.spark.SparkException
Expand All @@ -27,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources.{AlwaysTrue, CreatableRelationProvider, Filter, InsertableRelation}
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.sources.v2.{SupportsWrite, Table}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand All @@ -37,11 +39,12 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* Rows in the output data set are appended.
*/
case class AppendDataExecV1(
writeBuilder: V1WriteBuilder,
table: SupportsWrite,
writeOptions: CaseInsensitiveStringMap,
plan: LogicalPlan) extends V1FallbackWriters {

override protected def doExecute(): RDD[InternalRow] = {
writeWithV1(writeBuilder.buildForV1Write())
writeWithV1(newWriteBuilder().buildForV1Write())
}
}

Expand All @@ -57,17 +60,17 @@ case class AppendDataExecV1(
* AlwaysTrue to delete all rows.
*/
case class OverwriteByExpressionExecV1(
table: Table,
writeBuilder: V1WriteBuilder,
table: SupportsWrite,
deleteWhere: Array[Filter],
writeOptions: CaseInsensitiveStringMap,
plan: LogicalPlan) extends V1FallbackWriters {

private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
}

override protected def doExecute(): RDD[InternalRow] = {
writeBuilder match {
newWriteBuilder() match {
case builder: SupportsTruncate if isTruncate(deleteWhere) =>
writeWithV1(builder.truncate().asV1Builder.buildForV1Write())

Expand All @@ -92,14 +95,23 @@ sealed trait V1FallbackWriters extends SupportsV1Write {
s"The returned writer ${other} was no longer a V1WriteBuilder.")
}
}

protected def newWriteBuilder(): V1WriteBuilder = {
val writeBuilder = table.newWriteBuilder(writeOptions)
.withInputDataSchema(plan.schema)
.withQueryId(UUID.randomUUID().toString)
writeBuilder.asV1Builder
}
}

/**
* A trait that allows Tables that use V1 Writer interfaces to append data.
*/
trait SupportsV1Write extends SparkPlan {
def table: SupportsWrite
// TODO: We should be able to work on SparkPlans at this point.
def plan: LogicalPlan
def writeOptions: CaseInsensitiveStringMap

protected def writeWithV1(relation: InsertableRelation): RDD[InternalRow] = {
relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ case class CreateTableAsSelectExec(

writeBuilder match {
case v1: V1WriteBuilder => writeWithV1(v1.buildForV1Write())
case v2 => doWrite(v2.buildForBatch())
case v2 => writeWithV2(v2.buildForBatch())
}

case _ =>
Expand Down Expand Up @@ -184,7 +184,7 @@ case class ReplaceTableAsSelectExec(

writeBuilder match {
case v1: V1WriteBuilder => writeWithV1(v1.buildForV1Write())
case v2 => doWrite(v2.buildForBatch())
case v2 => writeWithV2(v2.buildForBatch())
}

case _ =>
Expand Down Expand Up @@ -246,11 +246,11 @@ case class AtomicReplaceTableAsSelectExec(
*/
case class AppendDataExec(
table: SupportsWrite,
writeBuilder: WriteBuilder,
query: SparkPlan) extends V2TableWriteExec {
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {

override protected def doExecute(): RDD[InternalRow] = {
doWrite(writeBuilder.buildForBatch())
writeWithV2(newWriteBuilder().buildForBatch())
}
}

Expand All @@ -266,21 +266,21 @@ case class AppendDataExec(
*/
case class OverwriteByExpressionExec(
table: SupportsWrite,
writeBuilder: WriteBuilder,
deleteWhere: Array[Filter],
query: SparkPlan) extends V2TableWriteExec {
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {

private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
}

override protected def doExecute(): RDD[InternalRow] = {
writeBuilder match {
newWriteBuilder() match {
case builder: SupportsTruncate if isTruncate(deleteWhere) =>
doWrite(builder.truncate().buildForBatch())
writeWithV2(builder.truncate().buildForBatch())

case builder: SupportsOverwrite =>
doWrite(builder.overwrite(deleteWhere).buildForBatch())
writeWithV2(builder.overwrite(deleteWhere).buildForBatch())

case _ =>
throw new SparkException(s"Table does not support overwrite by expression: $table")
Expand All @@ -300,15 +300,12 @@ case class OverwriteByExpressionExec(
case class OverwritePartitionsDynamicExec(
table: SupportsWrite,
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec {
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {

override protected def doExecute(): RDD[InternalRow] = {
val writeBuilder = table.newWriteBuilder(writeOptions)
.withInputDataSchema(query.schema)
.withQueryId(UUID.randomUUID().toString)
writeBuilder match {
newWriteBuilder() match {
case builder: SupportsDynamicOverwrite =>
doWrite(builder.overwriteDynamicPartitions().buildForBatch())
writeWithV2(builder.overwriteDynamicPartitions().buildForBatch())

case _ =>
throw new SparkException(s"Table does not support dynamic partition overwrite: $table")
Expand All @@ -323,7 +320,22 @@ case class WriteToDataSourceV2Exec(
def writeOptions: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()

override protected def doExecute(): RDD[InternalRow] = {
doWrite(batchWrite)
writeWithV2(batchWrite)
}
}

/**
* Helper for physical plans that build batch writes.
*/
trait BatchWriteHelper {
def table: SupportsWrite
def query: SparkPlan
def writeOptions: CaseInsensitiveStringMap

def newWriteBuilder(): WriteBuilder = {
table.newWriteBuilder(writeOptions)
.withInputDataSchema(query.schema)
.withQueryId(UUID.randomUUID().toString)
}
}

Expand All @@ -338,7 +350,7 @@ trait V2TableWriteExec extends UnaryExecNode {
override def child: SparkPlan = query
override def output: Seq[Attribute] = Nil

protected def doWrite(batchWrite: BatchWrite): RDD[InternalRow] = {
protected def writeWithV2(batchWrite: BatchWrite): RDD[InternalRow] = {
val writerFactory = batchWrite.createBatchWriterFactory()
val useCommitCoordinator = batchWrite.useCommitCoordinator
val rdd = query.execute()
Expand Down Expand Up @@ -470,7 +482,7 @@ private[v2] trait AtomicTableWriteExec extends V2TableWriteExec with SupportsV1W

val writtenRows = writeBuilder match {
case v1: V1WriteBuilder => writeWithV1(v1.buildForV1Write())
case v2 => doWrite(v2.buildForBatch())
case v2 => writeWithV2(v2.buildForBatch())
}
stagedTable.commitStagedChanges()
writtenRows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.sources.v2.writer

import org.apache.spark.annotation.{Experimental, Unstable}
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite

/**
* A trait that should be implemented by V1 DataSources that would like to leverage the DataSource
Expand All @@ -44,4 +45,9 @@ trait V1WriteBuilder extends WriteBuilder {
* @since 3.0.0
*/
def buildForV1Write(): InsertableRelation

// These methods cannot be implemented by a V1WriteBuilder.
override final def buildForBatch(): BatchWrite = super.buildForBatch()

override final def buildForStreaming(): StreamingWrite = super.buildForStreaming()
}

0 comments on commit 00347ee

Please sign in to comment.