Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23203][SQL]: DataSourceV2: Use immutable logical plans. #20387

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,9 @@

package org.apache.spark.sql.kafka010

import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger

import org.scalatest.time.SpanSugar._
import scala.collection.mutable
import scala.util.Random

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.streaming.Trigger

// Run tests in KafkaSourceSuiteBase in continuous execution mode.
class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest
Expand Down Expand Up @@ -71,7 +60,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
}.exists { r =>
// Ensure the new topic is present and the old topic is gone.
r.knownPartitions.exists(_.topic == topic2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.streaming.Trigger
Expand All @@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
}.exists(_.knownPartitions.size == newCount),
s"query never reconfigured to $newCount partitions")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataset, ForeachWriter}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.functions.{count, window}
Expand Down Expand Up @@ -119,7 +119,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
} ++ (query.get.lastExecution match {
case null => Seq()
case e => e.logical.collect {
case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
case StreamingDataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
}
})
}.distinct
Expand Down
41 changes: 9 additions & 32 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.{DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -189,39 +189,16 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val ds = cls.newInstance()
val options = new DataSourceOptions((extraOptions ++
DataSourceV2Utils.extractSessionConfigs(
ds = ds.asInstanceOf[DataSourceV2],
conf = sparkSession.sessionState.conf)).asJava)

// Streaming also uses the data source V2 API. So it may be that the data source implements
// v2, but has no v2 implementation for batch reads. In that case, we fall back to loading
// the dataframe as a v1 source.
val reader = (ds, userSpecifiedSchema) match {
case (ds: ReadSupportWithSchema, Some(schema)) =>
ds.createReader(schema, options)

case (ds: ReadSupport, None) =>
ds.createReader(options)

case (ds: ReadSupportWithSchema, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $ds.")

case (ds: ReadSupport, Some(schema)) =>
val reader = ds.createReader(options)
if (reader.readSchema() != schema) {
throw new AnalysisException(s"$ds does not allow user-specified schemas.")
}
reader

case _ => null // fall back to v1
}
val ds = cls.newInstance().asInstanceOf[DataSourceV2]
if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) {
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
ds = ds, conf = sparkSession.sessionState.conf)
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
ds, extraOptions.toMap ++ sessionOptions,
userSpecifiedSchema = userSpecifiedSchema))

if (reader == null) {
loadV1Source(paths: _*)
} else {
Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
loadV1Source(paths: _*)
}
} else {
loadV1Source(paths: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,80 @@

package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics}
import org.apache.spark.sql.types.StructType

case class DataSourceV2Relation(
output: Seq[AttributeReference],
reader: DataSourceReader)
extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
source: DataSourceV2,
options: Map[String, String],
projection: Seq[AttributeReference],
filters: Option[Seq[Expression]] = None,
userSpecifiedSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation {

import DataSourceV2Relation._

override def simpleString: String = {
s"DataSourceV2Relation(source=${source.name}, " +
s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " +
s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
}

override lazy val schema: StructType = reader.readSchema()

override lazy val output: Seq[AttributeReference] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reverts #20485 , can we still pass the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a look. I didn't realize you'd committed that one already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled your code and played with it. So your PR does fix the bug, but in a hacky way. Let's me explain what happened.

  1. QueryPlan.canonicalized is called, every expression in DataSourceV2Relation is canonicalized, including DataSourceV2Relation.projection. This means, the attributes in projection are all renamed to "none".
  2. DataSourceV2Relation.output is called, which triggers the creation of the reader, and applies filter push down and column pruning. Note that because all attributes are renamed to "none", we are actually pushing invalid filters and columns to data sources.
  3. line up reader.schema and projection, to get the actual output. Because all names are "none", it works.

However step 2 is pretty dangerous, Spark doesn't define the behavior of pushing invalid filters and columns, especially what reader.schema should return after invalid columns are pushed down.

I prefer my original fix, which put output in DataSourceV2Relation's constructor parameters, and update it when doing column pruning in PushDownOperatorsToDataSource.

Copy link
Contributor Author

@rdblue rdblue Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it's a bad idea to run push-down here. I fixed this by implementing doCanonicalize and returning a node that overrides the output val. I think that is cleaner than pulling the logic outside of the relation. There's no need for every place that creates a relation to need to get the output of a reader, which is the only way to determine what the node's output will be.

// use the projection attributes to avoid assigning new ids. fields that are not projected
// will be assigned new ids, which is okay because they are not projected.
val attrMap = projection.map(a => a.name -> a).toMap
schema.map(f => attrMap.getOrElse(f.name,
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()))
}

private lazy val v2Options: DataSourceOptions = makeV2Options(options)

lazy val (
reader: DataSourceReader,
unsupportedFilters: Seq[Expression],
pushedFilters: Seq[Expression]) = {
val newReader = userSpecifiedSchema match {
case Some(s) =>
source.asReadSupportWithSchema.createReader(s, v2Options)
case _ =>
source.asReadSupport.createReader(v2Options)
}

DataSourceV2Relation.pushRequiredColumns(newReader, projection.toStructType)

override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation]
val (remainingFilters, pushedFilters) = filters match {
case Some(filterSeq) =>
DataSourceV2Relation.pushFilters(newReader, filterSeq)
case _ =>
(Nil, Nil)
}

(newReader, remainingFilters, pushedFilters)
}

override def doCanonicalize(): LogicalPlan = {
val c = super.doCanonicalize().asInstanceOf[DataSourceV2Relation]

// override output with canonicalized output to avoid attempting to configure a reader
val canonicalOutput: Seq[AttributeReference] = this.output
.map(a => QueryPlan.normalizeExprId(a, projection))

new DataSourceV2Relation(c.source, c.options, c.projection) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hacky but I don't have a better idea now, let's revisit it later.

override lazy val output: Seq[AttributeReference] = canonicalOutput
}
}

override def computeStats(): Statistics = reader match {
case r: SupportsReportStatistics =>
Expand All @@ -37,22 +100,147 @@ case class DataSourceV2Relation(
}

override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
// projection is used to maintain id assignment.
// if projection is not set, use output so the copy is not equal to the original
copy(projection = projection.map(_.newInstance()))
}
}

/**
* A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical
* to the non-streaming relation.
*/
class StreamingDataSourceV2Relation(
case class StreamingDataSourceV2Relation(
output: Seq[AttributeReference],
reader: DataSourceReader) extends DataSourceV2Relation(output, reader) {
reader: DataSourceReader)
extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation {
override def isStreaming: Boolean = true

override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation]

override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))

override def computeStats(): Statistics = reader match {
case r: SupportsReportStatistics =>
Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
}

object DataSourceV2Relation {
def apply(reader: DataSourceReader): DataSourceV2Relation = {
new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
private implicit class SourceHelpers(source: DataSourceV2) {
def asReadSupport: ReadSupport = {
source match {
case support: ReadSupport =>
support
case _: ReadSupportWithSchema =>
// this method is only called if there is no user-supplied schema. if there is no
// user-supplied schema and ReadSupport was not implemented, throw a helpful exception.
throw new AnalysisException(s"Data source requires a user-supplied schema: $name")
case _ =>
throw new AnalysisException(s"Data source is not readable: $name")
}
}

def asReadSupportWithSchema: ReadSupportWithSchema = {
source match {
case support: ReadSupportWithSchema =>
support
case _: ReadSupport =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different from before: see https://github.com/apache/spark/pull/20387/files#diff-f70bda59304588cc3abfa3a9840653f4L214

Even if userSpecifiedSchema is not none, it's still allowed to have ReadSupport, as long as its reader's schema is same as the user specified schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another concern is: this check should be done ASAP so that we can fail earlier.

Copy link
Contributor Author

@rdblue rdblue Feb 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For your second concern about checking ASAP: this is done when the relation is first created because projection is required and is always based on the schema returned by a reader. So this is already done ASAP.

To be more clear about when this check should happen, I think that the requirement is for this to happen during job planning and, ideally, before filter push-down.

For the case where the user supplies a schema that is identical to the source's schema: I think this will cause confusion when source schemas change. Also, I can't think of a situation where it is a good idea to pass a schema that is ignored.

Here's an example of how this will be confusing: think of a job that supplies a schema identical to the table's schema and runs fine, so it goes into production. What happens when the table's schema changes? If someone adds a column to the table, then the job will start failing and report that the source doesn't support user-supplied schemas, even though it had previously worked just fine with a user-supplied schema. In addition, the change to the table is actually compatible with the old job because the new column will be removed by a projection.

To fix this situation, it may be tempting to use the user-supplied schema as an initial projection. But that doesn't make sense because we don't need two projection mechanisms. If we used this as a second way to project, it would be confusing that you can't actually leave out columns (at least for CSV) and it would be odd that using this path you can coerce types, which should usually be done by Spark.

I think it is best not to allow a user-supplied schema when it isn't supported by a source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a historical reason we do this: #15046

I agree it's more clear to not allow this since data source v2 is brand new. But this change worths a JIRA ticket and an individual PR, do you mind to create one? Or I can do that for you.

throw new AnalysisException(
s"Data source does not support user-supplied schema: $name")
case _ =>
throw new AnalysisException(s"Data source is not readable: $name")
}
}

def name: String = {
source match {
case registered: DataSourceRegister =>
registered.shortName()
case _ =>
source.getClass.getSimpleName
}
}
}

private def makeV2Options(options: Map[String, String]): DataSourceOptions = {
new DataSourceOptions(options.asJava)
}

private def schema(
source: DataSourceV2,
v2Options: DataSourceOptions,
userSchema: Option[StructType]): StructType = {
val reader = userSchema match {
// TODO: remove this case because it is confusing for users
case Some(s) if !source.isInstanceOf[ReadSupportWithSchema] =>
val reader = source.asReadSupport.createReader(v2Options)
if (reader.readSchema() != s) {
throw new AnalysisException(s"${source.name} does not allow user-specified schemas.")
}
reader
case Some(s) =>
source.asReadSupportWithSchema.createReader(s, v2Options)
case _ =>
source.asReadSupport.createReader(v2Options)
}
reader.readSchema()
}

def create(
source: DataSourceV2,
options: Map[String, String],
filters: Option[Seq[Expression]] = None,
userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
val projection = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes
DataSourceV2Relation(source, options, projection, filters,
// if the source does not implement ReadSupportWithSchema, then the userSpecifiedSchema must
// be equal to the reader's schema. the schema method enforces this. because the user schema
// and the reader's schema are identical, drop the user schema.
if (source.isInstanceOf[ReadSupportWithSchema]) userSpecifiedSchema else None)
}

private def pushRequiredColumns(reader: DataSourceReader, struct: StructType): Unit = {
reader match {
case projectionSupport: SupportsPushDownRequiredColumns =>
projectionSupport.pruneColumns(struct)
case _ =>
}
}

private def pushFilters(
reader: DataSourceReader,
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
reader match {
case catalystFilterSupport: SupportsPushDownCatalystFilters =>
(
catalystFilterSupport.pushCatalystFilters(filters.toArray),
catalystFilterSupport.pushedCatalystFilters()
)

case filterSupport: SupportsPushDownFilters =>
// A map from original Catalyst expressions to corresponding translated data source
// filters. If a predicate is not in this map, it means it cannot be pushed down.
val translatedMap: Map[Expression, Filter] = filters.flatMap { p =>
DataSourceStrategy.translateFilter(p).map(f => p -> f)
}.toMap

// Catalyst predicate expressions that cannot be converted to data source filters.
val nonConvertiblePredicates = filters.filterNot(translatedMap.contains)

// Data source filters that cannot be pushed down. An unhandled filter means
// the data source cannot guarantee the rows returned can pass the filter.
// As a result we must return it so Spark can plan an extra filter operator.
val unhandledFilters = filterSupport.pushFilters(translatedMap.values.toArray).toSet
val (unhandledPredicates, pushedPredicates) = translatedMap.partition { case (_, f) =>
unhandledFilters.contains(f)
}

(nonConvertiblePredicates ++ unhandledPredicates.keys, pushedPredicates.keys.toSeq)

case _ => (filters, Nil)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import org.apache.spark.sql.execution.SparkPlan

object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case DataSourceV2Relation(output, reader) =>
DataSourceV2ScanExec(output, reader) :: Nil
case relation: DataSourceV2Relation =>
DataSourceV2ScanExec(relation.output, relation.reader) :: Nil

case relation: StreamingDataSourceV2Relation =>
DataSourceV2ScanExec(relation.output, relation.reader) :: Nil

case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
Expand Down
Loading