Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jan 30, 2020
1 parent 11f54f6 commit 2ceb46e
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
dsOptions)
(catalog.loadTable(ident), Some(catalog), Some(ident))
case _ =>
<<<<<<< HEAD
// TODO: Non-catalog paths for DSV2 are currently not well defined.
userSpecifiedSchema match {
case Some(schema) => (provider.getTable(dsOptions, schema), None, None)
case _ => (provider.getTable(dsOptions), None, None)
}
=======
DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema)
>>>>>>> refine TableProvider
val tbl = DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema)
(tbl, None, None)
}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
table match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// query schema is not compatible with the existing data, the write can still success but
// following reads would fail.
if (provider.isInstanceOf[FileDataSourceV2]) {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
provider.getTable(df.schema, partitioningAsV2, dsOptions.asCaseSensitiveMap())
provider.getTable(
df.schema.asNullable,
partitioningAsV2.toArray,
dsOptions.asCaseSensitiveMap())
} else {
DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema = None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, SupportsRead, Table,
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns, V1Scan}
import org.apache.spark.sql.execution.RowDataSourceScanExec
import org.apache.spark.sql.internal.connector.SimpleTableProvider
import org.apache.spark.sql.sources.{BaseRelation, Filter, GreaterThan, TableScan}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -120,7 +121,7 @@ object V1ReadFallbackCatalog {
val schema = new StructType().add("i", "int").add("j", "int")
}

class V1ReadFallbackTableProvider extends TableProvider {
class V1ReadFallbackTableProvider extends SimpleTableProvider {
override def getTable(options: CaseInsensitiveStringMap): Table = {
new TableWithV1ReadFallback("v1-read-fallback")
}
Expand Down

0 comments on commit 2ceb46e

Please sign in to comment.