Skip to content

Commit

Permalink
[SPARK-23553][TESTS] Tests should not assume the default value of `sp…
Browse files Browse the repository at this point in the history
…ark.sql.sources.default`

## What changes were proposed in this pull request?

Currently, some tests have an assumption that `spark.sql.sources.default=parquet`. In fact, that is a correct assumption, but that assumption makes it difficult to test new data source format.

This PR aims to
- Improve test suites more robust and makes it easy to test new data sources in the future.
- Test new native ORC data source with the full existing Apache Spark test coverage.

As an example, the PR uses `spark.sql.sources.default=orc` during reviews. The value should be `parquet` when this PR is accepted.

## How was this patch tested?

Pass the Jenkins with updated tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20705 from dongjoon-hyun/SPARK-23553.

(cherry picked from commit 5414abc)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
dongjoon-hyun authored and gatorsmile committed Mar 16, 2018
1 parent d9e1f70 commit 21b6de4
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 71 deletions.
4 changes: 2 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ def load(self, path=None, format=None, schema=None, **options):
or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
:param options: all other string options
>>> df = spark.read.load('python/test_support/sql/parquet_partitioned', opt1=True,
... opt2=1, opt3='str')
>>> df = spark.read.format("parquet").load('python/test_support/sql/parquet_partitioned',
... opt1=True, opt2=1, opt3='str')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2148,7 +2148,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("data source table created in InMemoryCatalog should be able to read/write") {
withTable("tbl") {
sql("CREATE TABLE tbl(i INT, j STRING) USING parquet")
val provider = spark.sessionState.conf.defaultDataSourceName
sql(s"CREATE TABLE tbl(i INT, j STRING) USING $provider")
checkAnswer(sql("SELECT i, j FROM tbl"), Nil)

Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto("tbl")
Expand Down Expand Up @@ -2472,9 +2473,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("SPARK-16975: Column-partition path starting '_' should be handled correctly") {
withTempDir { dir =>
val parquetDir = new File(dir, "parquet").getCanonicalPath
spark.range(10).withColumn("_col", $"id").write.partitionBy("_col").save(parquetDir)
spark.read.parquet(parquetDir)
val dataDir = new File(dir, "data").getCanonicalPath
spark.range(10).withColumn("_col", $"id").write.partitionBy("_col").save(dataDir)
spark.read.load(dataDir)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,10 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") {
withSQLConf("spark.sql.cbo.enabled" -> "true") {
// This test case depends on the size of parquet in statistics.
withSQLConf(
SQLConf.CBO_ENABLED.key -> "true",
SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "parquet") {
withTempPath { workDir =>
withTable("table1") {
val workDirPath = workDir.getAbsolutePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,15 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1")

val e = intercept[AnalysisException] {
Seq(5 -> "e").toDF("i", "j").write.mode("append").format("json").saveAsTable("t1")
val format = if (spark.sessionState.conf.defaultDataSourceName.equalsIgnoreCase("json")) {
"orc"
} else {
"json"
}
Seq(5 -> "e").toDF("i", "j").write.mode("append").format(format).saveAsTable("t1")
}
assert(e.message.contains("The format of the existing table default.t1 is " +
"`ParquetFileFormat`. It doesn't match the specified format `JsonFileFormat`."))
assert(e.message.contains("The format of the existing table default.t1 is "))
assert(e.message.contains("It doesn't match the specified format"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
val timeZone = TimeZone.getDefault()
val timeZoneId = timeZone.getID

protected override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "parquet")
}

protected override def afterAll(): Unit = {
spark.conf.unset(SQLConf.DEFAULT_DATA_SOURCE_NAME.key)
super.afterAll()
}

test("column type inference") {
def check(raw: String, literal: Literal, timeZone: TimeZone = timeZone): Unit = {
assert(inferPartitionColumnValue(raw, true, timeZone) === literal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,8 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
"and a same-name temp view exist") {
withTable("same_name") {
withTempView("same_name") {
sql("CREATE TABLE same_name(id LONG) USING parquet")
val format = spark.sessionState.conf.defaultDataSourceName
sql(s"CREATE TABLE same_name(id LONG) USING $format")
spark.range(10).createTempView("same_name")
spark.range(20).write.mode(SaveMode.Append).saveAsTable("same_name")
checkAnswer(spark.table("same_name"), spark.range(10).toDF())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}

test("Pre insert nullability check (ArrayType)") {
withTable("arrayInParquet") {
withTable("array") {
{
val df = (Tuple1(Seq(Int.box(1), null: Integer)) :: Nil).toDF("a")
val expectedSchema =
Expand All @@ -604,9 +604,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(df.schema === expectedSchema)

df.write
.format("parquet")
.mode(SaveMode.Overwrite)
.saveAsTable("arrayInParquet")
.saveAsTable("array")
}

{
Expand All @@ -621,25 +620,24 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(df.schema === expectedSchema)

df.write
.format("parquet")
.mode(SaveMode.Append)
.insertInto("arrayInParquet")
.insertInto("array")
}

(Tuple1(Seq(4, 5)) :: Nil).toDF("a")
.write
.mode(SaveMode.Append)
.saveAsTable("arrayInParquet") // This one internally calls df2.insertInto.
.saveAsTable("array") // This one internally calls df2.insertInto.

(Tuple1(Seq(Int.box(6), null: Integer)) :: Nil).toDF("a")
.write
.mode(SaveMode.Append)
.saveAsTable("arrayInParquet")
.saveAsTable("array")

sparkSession.catalog.refreshTable("arrayInParquet")
sparkSession.catalog.refreshTable("array")

checkAnswer(
sql("SELECT a FROM arrayInParquet"),
sql("SELECT a FROM array"),
Row(ArrayBuffer(1, null)) ::
Row(ArrayBuffer(2, 3)) ::
Row(ArrayBuffer(4, 5)) ::
Expand All @@ -648,7 +646,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}

test("Pre insert nullability check (MapType)") {
withTable("mapInParquet") {
withTable("map") {
{
val df = (Tuple1(Map(1 -> (null: Integer))) :: Nil).toDF("a")
val expectedSchema =
Expand All @@ -661,9 +659,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(df.schema === expectedSchema)

df.write
.format("parquet")
.mode(SaveMode.Overwrite)
.saveAsTable("mapInParquet")
.saveAsTable("map")
}

{
Expand All @@ -678,27 +675,24 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(df.schema === expectedSchema)

df.write
.format("parquet")
.mode(SaveMode.Append)
.insertInto("mapInParquet")
.insertInto("map")
}

(Tuple1(Map(4 -> 5)) :: Nil).toDF("a")
.write
.format("parquet")
.mode(SaveMode.Append)
.saveAsTable("mapInParquet") // This one internally calls df2.insertInto.
.saveAsTable("map") // This one internally calls df2.insertInto.

(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
.write
.format("parquet")
.mode(SaveMode.Append)
.saveAsTable("mapInParquet")
.saveAsTable("map")

sparkSession.catalog.refreshTable("mapInParquet")
sparkSession.catalog.refreshTable("map")

checkAnswer(
sql("SELECT a FROM mapInParquet"),
sql("SELECT a FROM map"),
Row(Map(1 -> null)) ::
Row(Map(2 -> 3)) ::
Row(Map(4 -> 5)) ::
Expand Down Expand Up @@ -852,52 +846,52 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
(from to to).map(i => i -> s"str$i").toDF("c1", "c2")
}

withTable("insertParquet") {
createDF(0, 9).write.format("parquet").saveAsTable("insertParquet")
withTable("t") {
createDF(0, 9).write.saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
sql("SELECT p.c1, p.c2 FROM t p WHERE p.c1 > 5"),
(6 to 9).map(i => Row(i, s"str$i")))

intercept[AnalysisException] {
createDF(10, 19).write.format("parquet").saveAsTable("insertParquet")
createDF(10, 19).write.saveAsTable("t")
}

createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
createDF(10, 19).write.mode(SaveMode.Append).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
sql("SELECT p.c1, p.c2 FROM t p WHERE p.c1 > 5"),
(6 to 19).map(i => Row(i, s"str$i")))

createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
createDF(20, 29).write.mode(SaveMode.Append).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"),
sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 25"),
(6 to 24).map(i => Row(i, s"str$i")))

intercept[AnalysisException] {
createDF(30, 39).write.saveAsTable("insertParquet")
createDF(30, 39).write.saveAsTable("t")
}

createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet")
createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 35"),
(6 to 34).map(i => Row(i, s"str$i")))

createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet")
createDF(40, 49).write.mode(SaveMode.Append).insertInto("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 45"),
(6 to 44).map(i => Row(i, s"str$i")))

createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet")
createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"),
sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 51 AND p.c1 < 55"),
(52 to 54).map(i => Row(i, s"str$i")))
createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet")
createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p"),
sql("SELECT p.c1, c2 FROM t p"),
(50 to 59).map(i => Row(i, s"str$i")))

createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet")
createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p"),
sql("SELECT p.c1, c2 FROM t p"),
(70 to 79).map(i => Row(i, s"str$i")))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ class PartitionProviderCompatibilitySuite
spark.range(5).selectExpr("id as fieldOne", "id as partCol").write
.partitionBy("partCol")
.mode("overwrite")
.parquet(dir.getAbsolutePath)
.save(dir.getAbsolutePath)

spark.sql(s"""
|create table $tableName (fieldOne long, partCol int)
|using parquet
|using ${spark.sessionState.conf.defaultDataSourceName}
|options (path "${dir.toURI}")
|partitioned by (partCol)""".stripMargin)
}
Expand Down Expand Up @@ -358,7 +358,7 @@ class PartitionProviderCompatibilitySuite
try {
spark.sql(s"""
|create table test (id long, P1 int, P2 int)
|using parquet
|using ${spark.sessionState.conf.defaultDataSourceName}
|options (path "${base.toURI}")
|partitioned by (P1, P2)""".stripMargin)
spark.sql(s"alter table test add partition (P1=0, P2=0) location '${a.toURI}'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ class PartitionedTablePerfStatsSuite
import spark.implicits._
Seq(1).toDF("a").write.mode("overwrite").save(dir.getAbsolutePath)
HiveCatalogMetrics.reset()
spark.read.parquet(dir.getAbsolutePath)
spark.read.load(dir.getAbsolutePath)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,8 +1658,8 @@ class HiveDDLSuite
Seq(5 -> "e").toDF("i", "j")
.write.format("hive").mode("append").saveAsTable("t1")
}
assert(e.message.contains("The format of the existing table default.t1 is " +
"`ParquetFileFormat`. It doesn't match the specified format `HiveFileFormat`."))
assert(e.message.contains("The format of the existing table default.t1 is "))
assert(e.message.contains("It doesn't match the specified format `HiveFileFormat`."))
}
}

Expand Down Expand Up @@ -1709,11 +1709,12 @@ class HiveDDLSuite
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tblName)).schema.map(_.name)
}

val provider = spark.sessionState.conf.defaultDataSourceName
withTable("t", "t1", "t2", "t3", "t4", "t5", "t6") {
sql("CREATE TABLE t(a int, b int, c int, d int) USING parquet PARTITIONED BY (d, b)")
sql(s"CREATE TABLE t(a int, b int, c int, d int) USING $provider PARTITIONED BY (d, b)")
assert(getTableColumns("t") == Seq("a", "c", "d", "b"))

sql("CREATE TABLE t1 USING parquet PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1 d")
sql(s"CREATE TABLE t1 USING $provider PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1 d")
assert(getTableColumns("t1") == Seq("a", "c", "d", "b"))

Seq((1, 1, 1, 1)).toDF("a", "b", "c", "d").write.partitionBy("d", "b").saveAsTable("t2")
Expand All @@ -1723,7 +1724,7 @@ class HiveDDLSuite
val dataPath = new File(new File(path, "d=1"), "b=1").getCanonicalPath
Seq(1 -> 1).toDF("a", "c").write.save(dataPath)

sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.toURI}'")
sql(s"CREATE TABLE t3 USING $provider LOCATION '${path.toURI}'")
assert(getTableColumns("t3") == Seq("a", "c", "d", "b"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,24 +516,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test("CTAS with default fileformat") {
val table = "ctas1"
val ctas = s"CREATE TABLE IF NOT EXISTS $table SELECT key k, value FROM src"
withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
withSQLConf("hive.default.fileformat" -> "textfile") {
Seq("orc", "parquet").foreach { dataSourceFormat =>
withSQLConf(
SQLConf.CONVERT_CTAS.key -> "true",
SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> dataSourceFormat,
"hive.default.fileformat" -> "textfile") {
withTable(table) {
sql(ctas)
// We should use parquet here as that is the default datasource fileformat. The default
// datasource file format is controlled by `spark.sql.sources.default` configuration.
// The default datasource file format is controlled by `spark.sql.sources.default`.
// This testcase verifies that setting `hive.default.fileformat` has no impact on
// the target table's fileformat in case of CTAS.
assert(sessionState.conf.defaultDataSourceName === "parquet")
checkRelation(tableName = table, isDataSourceTable = true, format = "parquet")
checkRelation(tableName = table, isDataSourceTable = true, format = dataSourceFormat)
}
}
withSQLConf("spark.sql.sources.default" -> "orc") {
withTable(table) {
sql(ctas)
checkRelation(tableName = table, isDataSourceTable = true, format = "orc")
}
}
}
}

Expand Down

0 comments on commit 21b6de4

Please sign in to comment.