Skip to content

Commit

Permalink
refine TableProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jan 30, 2020
1 parent bda0669 commit 11f54f6
Show file tree
Hide file tree
Showing 35 changed files with 306 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBat
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
Expand All @@ -51,7 +52,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
with TableProvider
with SimpleTableProvider
with Logging {
import KafkaSourceProvider._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.sql.connector.catalog;

import java.util.Map;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

Expand All @@ -36,26 +39,50 @@
public interface TableProvider {

/**
* Return a {@link Table} instance to do read/write with user-specified options.
* Infer the schema of the table identified by the given options.
*
* @param options an immutable case-insensitive string-to-string map that can identify a table,
* e.g. file path, Kafka topic name, etc.
*/
StructType inferSchema(CaseInsensitiveStringMap options);

/**
* Infer the partitioning of the table identified by the given options.
* <p>
* By default this method returns empty partitioning, please override it if this source support
* partitioning.
*
* @param options an immutable case-insensitive string-to-string map that can identify a table,
* e.g. file path, Kafka topic name, etc.
*/
default Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
return new Transform[0];
}

/**
* Return a {@link Table} instance with the specified table schema, partitioning and properties
* to do read/write. The returned table should report the same schema and partitioning with the
* specified ones, or Spark may fail the operation.
*
* @param options the user-specified options that can identify a table, e.g. file path, Kafka
* topic name, etc. It's an immutable case-insensitive string-to-string map.
* @param schema The specified table schema.
* @param partitioning The specified table partitioning.
* @param properties The specified table properties. It's case preserving (contains exactly what
* users specified) and implementations are free to use it case sensitively or
* insensitively. It should be able to identify a table, e.g. file path, Kafka
* topic name, etc.
*/
Table getTable(CaseInsensitiveStringMap options);
Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties);

/**
* Return a {@link Table} instance to do read/write with user-specified schema and options.
* Returns true if the source has the ability of accepting external table metadata when getting
* tables. The external table metadata includes user-specified schema from
* `DataFrameReader`/`DataStreamReader` and schema/partitioning stored in Spark catalog.
* <p>
* By default this method throws {@link UnsupportedOperationException}, implementations should
* override this method to handle user-specified schema.
* </p>
* @param options the user-specified options that can identify a table, e.g. file path, Kafka
* topic name, etc. It's an immutable case-insensitive string-to-string map.
* @param schema the user-specified schema.
* @throws UnsupportedOperationException
* By default this method returns false, which means the schema and partitioning passed to
* `getTable` are from the infer methods. Please override it if this source has expensive
* schema/partitioning inference and wants external table metadata to avoid inference.
*/
default Table getTable(CaseInsensitiveStringMap options, StructType schema) {
throw new UnsupportedOperationException(
this.getClass().getSimpleName() + " source does not support user-specified schema");
default boolean supportsExternalMetadata() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.connector.expressions.{BucketTransform, IdentityTransform, LogicalExpressions, Transform}
import org.apache.spark.sql.types.StructType

/**
* Conversion helpers for working with v2 [[CatalogPlugin]].
*/
private[sql] object CatalogV2Implicits {
import LogicalExpressions._

implicit class PartitionTypeHelper(partitionType: StructType) {
implicit class PartitionTypeHelper(colNames: Seq[String]) {
def asTransforms: Array[Transform] = {
partitionType.names.map(col => identity(reference(Seq(col)))).toArray
colNames.map(col => identity(reference(Seq(col)))).toArray
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.internal.connector

import java.util

import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

// A simple version of `TableProvider` which doesn't support specified table schema/partitioning
// and treats table properties case-insensitively. This is private and only used in builtin sources.
trait SimpleTableProvider extends TableProvider {

def getTable(options: CaseInsensitiveStringMap): Table

private[this] var loadedTable: Table = _
private def getOrLoadTable(options: CaseInsensitiveStringMap): Table = {
if (loadedTable == null) loadedTable = getTable(options)
loadedTable
}

override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
getOrLoadTable(options).schema()
}

override def getTable(
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
assert(partitioning.isEmpty)
getOrLoadTable(new CaseInsensitiveStringMap(properties))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
dsOptions)
(catalog.loadTable(ident), Some(catalog), Some(ident))
case _ =>
<<<<<<< HEAD
// TODO: Non-catalog paths for DSV2 are currently not well defined.
userSpecifiedSchema match {
case Some(schema) => (provider.getTable(dsOptions, schema), None, None)
case _ => (provider.getTable(dsOptions), None, None)
}
=======
DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema)
>>>>>>> refine TableProvider
}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
table match {
Expand Down
21 changes: 17 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val options = sessionOptions ++ extraOptions
val dsOptions = new CaseInsensitiveStringMap(options.asJava)

def getTable: Table = {
// For file source, it's expensive to infer schema/partition at each write. Here we pass
// the schema of input query and the user-specified partitioning to `getTable`. If the
// query schema is not compatible with the existing data, the write can still success but
// following reads would fail.
if (provider.isInstanceOf[FileDataSourceV2]) {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
provider.getTable(df.schema, partitioningAsV2, dsOptions.asCaseSensitiveMap())
} else {
DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema = None)
}
}

import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val catalogManager = df.sparkSession.sessionState.catalogManager
mode match {
Expand All @@ -268,8 +281,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
supportsExtract, catalogManager, dsOptions)

(catalog.loadTable(ident), Some(catalog), Some(ident))
case tableProvider: TableProvider =>
val t = tableProvider.getTable(dsOptions)
case _: TableProvider =>
val t = getTable
if (t.supports(BATCH_WRITE)) {
(t, None, None)
} else {
Expand Down Expand Up @@ -314,8 +327,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
extraOptions.toMap,
ignoreIfExists = createMode == SaveMode.Ignore)
}
case tableProvider: TableProvider =>
if (tableProvider.getTable(dsOptions).supports(BATCH_WRITE)) {
case _: TableProvider =>
if (getTable.supports(BATCH_WRITE)) {
throw new AnalysisException(s"TableProvider implementation $source cannot be " +
s"written with $createMode mode, please use Append or Overwrite " +
"modes instead.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import java.util
import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableProvider}
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand All @@ -33,7 +34,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* This is no-op datasource. It does not do anything besides consuming its input.
* This can be useful for benchmarking or to cache data without any additional overhead.
*/
class NoopDataSource extends TableProvider with DataSourceRegister {
class NoopDataSource extends SimpleTableProvider with DataSourceRegister {
override def shortName(): String = "noop"
override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package org.apache.spark.sql.execution.datasources.v2
import java.util.regex.Pattern

import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, TableProvider}
import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table, TableProvider}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

private[sql] object DataSourceV2Utils extends Logging {

Expand Down Expand Up @@ -57,4 +59,28 @@ private[sql] object DataSourceV2Utils extends Logging {
case _ => Map.empty
}
}

def getTableFromProvider(
provider: TableProvider,
options: CaseInsensitiveStringMap,
userSpecifiedSchema: Option[StructType]): Table = {
userSpecifiedSchema match {
case Some(schema) =>
if (provider.supportsExternalMetadata()) {
provider.getTable(
schema,
provider.inferPartitioning(options),
options.asCaseSensitiveMap())
} else {
throw new UnsupportedOperationException(
s"${provider.getClass.getSimpleName} source does not support user-specified schema.")
}

case None =>
provider.getTable(
provider.inferSchema(options),
provider.inferPartitioning(options),
options.asCaseSensitiveMap())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
*/
package org.apache.spark.sql.execution.datasources.v2

import java.util

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.TableProvider
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -59,4 +63,40 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
}

// TODO: To reduce code diff of SPARK-29665, we create stub implementations for file source v2, so
// that we don't need to touch all the file source v2 classes. We should remove the stub
// implementation and directly implement the TableProvider APIs.
protected def getTable(options: CaseInsensitiveStringMap): Table
protected def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
throw new UnsupportedOperationException("user-specified schema")
}

override def supportsExternalMetadata(): Boolean = true

private var t: Table = null

override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
if (t == null) t = getTable(options)
t.schema()
}

// TODO: implement a light-weight partition inference which only looks at the path of one leaf
// file and return partition column names. For now the partition inference happens in
// `getTable`, because we don't know the user-specified schema here.
override def inferPartitioning(options: CaseInsensitiveStringMap): Array[Transform] = {
Array.empty
}

override def getTable(
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
// If the table is already loaded during schema inference, return it directly.
if (t != null) {
t
} else {
getTable(new CaseInsensitiveStringMap(properties), schema)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ abstract class FileTable(
StructType(fields)
}

override def partitioning: Array[Transform] = fileIndex.partitionSchema.asTransforms
override def partitioning: Array[Transform] = fileIndex.partitionSchema.names.toSeq.asTransforms

override def properties: util.Map[String, String] = options.asCaseSensitiveMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import java.util
import scala.collection.JavaConverters._

import org.apache.spark.sql._
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableProvider}
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsTruncate, WriteBuilder}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand All @@ -35,7 +36,7 @@ case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
override def schema: StructType = data.schema
}

class ConsoleSinkProvider extends TableProvider
class ConsoleSinkProvider extends SimpleTableProvider
with DataSourceRegister
with CreatableRelationProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability, TableProvider}
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability}
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream, Offset => OffsetV2, SparkDataStream}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand Down Expand Up @@ -94,7 +95,7 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Spa

// This class is used to indicate the memory stream data source. We don't actually use it, as
// memory stream is for test only and we never look it up by name.
object MemoryStreamTableProvider extends TableProvider {
object MemoryStreamTableProvider extends SimpleTableProvider {
override def getTable(options: CaseInsensitiveStringMap): Table = {
throw new IllegalStateException("MemoryStreamTableProvider should not be used.")
}
Expand Down
Loading

0 comments on commit 11f54f6

Please sign in to comment.