Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23553][TESTS] Tests should not assume the default value of spark.sql.sources.default #20705

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ def load(self, path=None, format=None, schema=None, **options):
or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
:param options: all other string options

>>> df = spark.read.load('python/test_support/sql/parquet_partitioned', opt1=True,
... opt2=1, opt3='str')
>>> df = spark.read.format("parquet").load('python/test_support/sql/parquet_partitioned',
... opt1=True, opt2=1, opt3='str')
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Mar 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike the other things, there is some difference from the original semantics.
As an alternative approach, we can add the following if we need to keep the original spark.read.load.

spark.conf.set("spark.sql.sources.default", "parquet")

>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2150,7 +2150,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("data source table created in InMemoryCatalog should be able to read/write") {
withTable("tbl") {
sql("CREATE TABLE tbl(i INT, j STRING) USING parquet")
val provider = spark.sessionState.conf.defaultDataSourceName
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm .. how about just explicitly setting spark.sql.sources.default to parquet in all places rather than using the default? If it's set to, for example, text, this test becomes failed. I thought it's a bit odd that a test is dependent on a default value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is SQLQuerySuite. The test case is correctly testing its purpose. Every data source have its own capability and limitation. Your example is only text data source's limitation supporting a single column schema, isn't it? For the other csv/json/orc/parquet will pass this specific test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far, the purpose of this PR is setting once in SQLConf.scala to order to test a new data source to find out the limitation instead of touching every data suite.

BTW, spark.sql.sources.default=parquet doesn't help this existing code because the SQL has a fixed string USING parquet.

sql(s"CREATE TABLE tbl(i INT, j STRING) USING $provider")
checkAnswer(sql("SELECT i, j FROM tbl"), Nil)

Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto("tbl")
Expand Down Expand Up @@ -2474,9 +2475,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("SPARK-16975: Column-partition path starting '_' should be handled correctly") {
withTempDir { dir =>
val parquetDir = new File(dir, "parquet").getCanonicalPath
spark.range(10).withColumn("_col", $"id").write.partitionBy("_col").save(parquetDir)
spark.read.parquet(parquetDir)
val dataDir = new File(dir, "data").getCanonicalPath
spark.range(10).withColumn("_col", $"id").write.partitionBy("_col").save(dataDir)
spark.read.load(dataDir)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,10 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") {
withSQLConf("spark.sql.cbo.enabled" -> "true") {
// This test case depends on the size of parquet in statistics.
withSQLConf(
SQLConf.CBO_ENABLED.key -> "true",
SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "parquet") {
withTempPath { workDir =>
withTable("table1") {
val workDirPath = workDir.getAbsolutePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,15 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1")

val e = intercept[AnalysisException] {
Seq(5 -> "e").toDF("i", "j").write.mode("append").format("json").saveAsTable("t1")
val format = if (spark.sessionState.conf.defaultDataSourceName.equalsIgnoreCase("json")) {
"orc"
} else {
"json"
}
Seq(5 -> "e").toDF("i", "j").write.mode("append").format(format).saveAsTable("t1")
}
assert(e.message.contains("The format of the existing table default.t1 is " +
"`ParquetFileFormat`. It doesn't match the specified format `JsonFileFormat`."))
assert(e.message.contains("The format of the existing table default.t1 is "))
assert(e.message.contains("It doesn't match the specified format"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
val timeZone = TimeZone.getDefault()
val timeZoneId = timeZone.getID

protected override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "parquet")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is ParquetPartitionDiscoverySuite, the test cases' assumption is legitimate.

}

protected override def afterAll(): Unit = {
spark.conf.unset(SQLConf.DEFAULT_DATA_SOURCE_NAME.key)
super.afterAll()
}

test("column type inference") {
def check(raw: String, literal: Literal, timeZone: TimeZone = timeZone): Unit = {
assert(inferPartitionColumnValue(raw, true, timeZone) === literal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,8 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
"and a same-name temp view exist") {
withTable("same_name") {
withTempView("same_name") {
sql("CREATE TABLE same_name(id LONG) USING parquet")
val format = spark.sessionState.conf.defaultDataSourceName
sql(s"CREATE TABLE same_name(id LONG) USING $format")
spark.range(10).createTempView("same_name")
spark.range(20).write.mode(SaveMode.Append).saveAsTable("same_name")
checkAnswer(spark.table("same_name"), spark.range(10).toDF())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}

test("Pre insert nullability check (ArrayType)") {
withTable("arrayInParquet") {
withTable("array") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good, maybe in a future cleanup, to replace all these repeating string literals (e.g, "array" 7 times, "map" 7 times) with a variable name.

{
val df = (Tuple1(Seq(Int.box(1), null: Integer)) :: Nil).toDF("a")
val expectedSchema =
Expand All @@ -604,9 +604,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(df.schema === expectedSchema)

df.write
.format("parquet")
.mode(SaveMode.Overwrite)
.saveAsTable("arrayInParquet")
.saveAsTable("array")
}

{
Expand All @@ -621,25 +620,24 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(df.schema === expectedSchema)

df.write
.format("parquet")
.mode(SaveMode.Append)
.insertInto("arrayInParquet")
.insertInto("array")
}

(Tuple1(Seq(4, 5)) :: Nil).toDF("a")
.write
.mode(SaveMode.Append)
.saveAsTable("arrayInParquet") // This one internally calls df2.insertInto.
.saveAsTable("array") // This one internally calls df2.insertInto.

(Tuple1(Seq(Int.box(6), null: Integer)) :: Nil).toDF("a")
.write
.mode(SaveMode.Append)
.saveAsTable("arrayInParquet")
.saveAsTable("array")

sparkSession.catalog.refreshTable("arrayInParquet")
sparkSession.catalog.refreshTable("array")

checkAnswer(
sql("SELECT a FROM arrayInParquet"),
sql("SELECT a FROM array"),
Row(ArrayBuffer(1, null)) ::
Row(ArrayBuffer(2, 3)) ::
Row(ArrayBuffer(4, 5)) ::
Expand All @@ -648,7 +646,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}

test("Pre insert nullability check (MapType)") {
withTable("mapInParquet") {
withTable("map") {
{
val df = (Tuple1(Map(1 -> (null: Integer))) :: Nil).toDF("a")
val expectedSchema =
Expand All @@ -661,9 +659,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(df.schema === expectedSchema)

df.write
.format("parquet")
.mode(SaveMode.Overwrite)
.saveAsTable("mapInParquet")
.saveAsTable("map")
}

{
Expand All @@ -678,27 +675,24 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(df.schema === expectedSchema)

df.write
.format("parquet")
.mode(SaveMode.Append)
.insertInto("mapInParquet")
.insertInto("map")
}

(Tuple1(Map(4 -> 5)) :: Nil).toDF("a")
.write
.format("parquet")
.mode(SaveMode.Append)
.saveAsTable("mapInParquet") // This one internally calls df2.insertInto.
.saveAsTable("map") // This one internally calls df2.insertInto.

(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
.write
.format("parquet")
.mode(SaveMode.Append)
.saveAsTable("mapInParquet")
.saveAsTable("map")

sparkSession.catalog.refreshTable("mapInParquet")
sparkSession.catalog.refreshTable("map")

checkAnswer(
sql("SELECT a FROM mapInParquet"),
sql("SELECT a FROM map"),
Row(Map(1 -> null)) ::
Row(Map(2 -> 3)) ::
Row(Map(4 -> 5)) ::
Expand Down Expand Up @@ -852,52 +846,52 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
(from to to).map(i => i -> s"str$i").toDF("c1", "c2")
}

withTable("insertParquet") {
createDF(0, 9).write.format("parquet").saveAsTable("insertParquet")
withTable("t") {
createDF(0, 9).write.saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
sql("SELECT p.c1, p.c2 FROM t p WHERE p.c1 > 5"),
(6 to 9).map(i => Row(i, s"str$i")))

intercept[AnalysisException] {
createDF(10, 19).write.format("parquet").saveAsTable("insertParquet")
createDF(10, 19).write.saveAsTable("t")
}

createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
createDF(10, 19).write.mode(SaveMode.Append).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
sql("SELECT p.c1, p.c2 FROM t p WHERE p.c1 > 5"),
(6 to 19).map(i => Row(i, s"str$i")))

createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
createDF(20, 29).write.mode(SaveMode.Append).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"),
sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 25"),
(6 to 24).map(i => Row(i, s"str$i")))

intercept[AnalysisException] {
createDF(30, 39).write.saveAsTable("insertParquet")
createDF(30, 39).write.saveAsTable("t")
}

createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet")
createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 35"),
(6 to 34).map(i => Row(i, s"str$i")))

createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet")
createDF(40, 49).write.mode(SaveMode.Append).insertInto("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 45"),
(6 to 44).map(i => Row(i, s"str$i")))

createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet")
createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"),
sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 51 AND p.c1 < 55"),
(52 to 54).map(i => Row(i, s"str$i")))
createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet")
createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p"),
sql("SELECT p.c1, c2 FROM t p"),
(50 to 59).map(i => Row(i, s"str$i")))

createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet")
createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p"),
sql("SELECT p.c1, c2 FROM t p"),
(70 to 79).map(i => Row(i, s"str$i")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about why the test named "SPARK-8156:create table to specific database by 'use dbname'" still has a hard-coded format of parquet. Is it testing functionality that is orthogonal to the format maybe?

I changed the hard-coded format to json, orc, and csv, and each time that test passed.

Similarly with
Suite: org.apache.spark.sql.sources.SaveLoadSuite
Test: SPARK-23459: Improve error message when specified unknown column in partition columns

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Mar 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is because this PR minimally changed only the test case causing failures. We cannot generalize all test cases at an one-shot huge PR for all modules. That will make it difficult to backport the other commits. The main goal of this PR is improving test-ability for new data sources.

For example, although SPARK-8156:create table to specific database by 'use dbname' writes to parquet, but reads with SQL, not by read.load. So, it doesn't fail. That's not generalized test case, but also not too much malicious.

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ class PartitionProviderCompatibilitySuite
spark.range(5).selectExpr("id as fieldOne", "id as partCol").write
.partitionBy("partCol")
.mode("overwrite")
.parquet(dir.getAbsolutePath)
.save(dir.getAbsolutePath)

spark.sql(s"""
|create table $tableName (fieldOne long, partCol int)
|using parquet
|using ${spark.sessionState.conf.defaultDataSourceName}
|options (path "${dir.toURI}")
|partitioned by (partCol)""".stripMargin)
}
Expand Down Expand Up @@ -358,7 +358,7 @@ class PartitionProviderCompatibilitySuite
try {
spark.sql(s"""
|create table test (id long, P1 int, P2 int)
|using parquet
|using ${spark.sessionState.conf.defaultDataSourceName}
|options (path "${base.toURI}")
|partitioned by (P1, P2)""".stripMargin)
spark.sql(s"alter table test add partition (P1=0, P2=0) location '${a.toURI}'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ class PartitionedTablePerfStatsSuite
import spark.implicits._
Seq(1).toDF("a").write.mode("overwrite").save(dir.getAbsolutePath)
HiveCatalogMetrics.reset()
spark.read.parquet(dir.getAbsolutePath)
spark.read.load(dir.getAbsolutePath)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,8 +1658,8 @@ class HiveDDLSuite
Seq(5 -> "e").toDF("i", "j")
.write.format("hive").mode("append").saveAsTable("t1")
}
assert(e.message.contains("The format of the existing table default.t1 is " +
"`ParquetFileFormat`. It doesn't match the specified format `HiveFileFormat`."))
assert(e.message.contains("The format of the existing table default.t1 is "))
assert(e.message.contains("It doesn't match the specified format `HiveFileFormat`."))
}
}

Expand Down Expand Up @@ -1709,11 +1709,12 @@ class HiveDDLSuite
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tblName)).schema.map(_.name)
}

val provider = spark.sessionState.conf.defaultDataSourceName
withTable("t", "t1", "t2", "t3", "t4", "t5", "t6") {
sql("CREATE TABLE t(a int, b int, c int, d int) USING parquet PARTITIONED BY (d, b)")
sql(s"CREATE TABLE t(a int, b int, c int, d int) USING $provider PARTITIONED BY (d, b)")
assert(getTableColumns("t") == Seq("a", "c", "d", "b"))

sql("CREATE TABLE t1 USING parquet PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1 d")
sql(s"CREATE TABLE t1 USING $provider PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1 d")
assert(getTableColumns("t1") == Seq("a", "c", "d", "b"))

Seq((1, 1, 1, 1)).toDF("a", "b", "c", "d").write.partitionBy("d", "b").saveAsTable("t2")
Expand All @@ -1723,7 +1724,7 @@ class HiveDDLSuite
val dataPath = new File(new File(path, "d=1"), "b=1").getCanonicalPath
Seq(1 -> 1).toDF("a", "c").write.save(dataPath)

sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.toURI}'")
sql(s"CREATE TABLE t3 USING $provider LOCATION '${path.toURI}'")
assert(getTableColumns("t3") == Seq("a", "c", "d", "b"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,24 +516,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test("CTAS with default fileformat") {
val table = "ctas1"
val ctas = s"CREATE TABLE IF NOT EXISTS $table SELECT key k, value FROM src"
withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
withSQLConf("hive.default.fileformat" -> "textfile") {
Seq("orc", "parquet").foreach { dataSourceFormat =>
withSQLConf(
SQLConf.CONVERT_CTAS.key -> "true",
SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> dataSourceFormat,
"hive.default.fileformat" -> "textfile") {
withTable(table) {
sql(ctas)
// We should use parquet here as that is the default datasource fileformat. The default
// datasource file format is controlled by `spark.sql.sources.default` configuration.
// The default datasource file format is controlled by `spark.sql.sources.default`.
// This testcase verifies that setting `hive.default.fileformat` has no impact on
// the target table's fileformat in case of CTAS.
assert(sessionState.conf.defaultDataSourceName === "parquet")
checkRelation(tableName = table, isDataSourceTable = true, format = "parquet")
checkRelation(tableName = table, isDataSourceTable = true, format = dataSourceFormat)
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, spark.sql.source.default=orc with hive.default.fileformat=textfile is not tested properly.

withSQLConf("spark.sql.sources.default" -> "orc") {
withTable(table) {
sql(ctas)
checkRelation(tableName = table, isDataSourceTable = true, format = "orc")
}
}
}
}

Expand Down