Skip to content

Commit

Permalink
[SPARK-23305][SQL][TEST] Test spark.sql.files.ignoreMissingFiles fo…
Browse files Browse the repository at this point in the history
…r all file-based data sources

## What changes were proposed in this pull request?

Like Parquet, all file-based data source handles `spark.sql.files.ignoreMissingFiles` correctly. We had better have a test coverage for feature parity and in order to prevent future accidental regression for all data sources.

## How was this patch tested?

Pass Jenkins with a newly added test case.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20479 from dongjoon-hyun/SPARK-23305.
  • Loading branch information
dongjoon-hyun authored and gatorsmile committed Feb 3, 2018
1 parent 63b49fa commit 522e0b1
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.sql

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
Expand Down Expand Up @@ -92,4 +96,37 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
}
}
}

allFileBasedDataSources.foreach { format =>
testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
def testIgnoreMissingFiles(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
Seq("0").toDF("a").write.format(format).save(new Path(basePath, "first").toString)
Seq("1").toDF("a").write.format(format).save(new Path(basePath, "second").toString)
val thirdPath = new Path(basePath, "third")
Seq("2").toDF("a").write.format(format).save(thirdPath.toString)
val df = spark.read.format(format).load(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)

val fs = thirdPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
assert(fs.delete(thirdPath, true))
checkAnswer(df, Seq(Row("0"), Row("1")))
}
}

withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
testIgnoreMissingFiles()
}

withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
val exception = intercept[SparkException] {
testIgnoreMissingFiles()
}
assert(exception.getMessage().contains("does not exist"))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,39 +355,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}

testQuietly("Enabling/disabling ignoreMissingFiles") {
def testIgnoreMissingFiles(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
val thirdPath = new Path(basePath, "third")
spark.range(2, 3).toDF("a").write.parquet(thirdPath.toString)
val df = spark.read.parquet(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)

val fs = thirdPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
fs.delete(thirdPath, true)
checkAnswer(
df,
Seq(Row(0), Row(1)))
}
}

withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
testIgnoreMissingFiles()
}

withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
val exception = intercept[SparkException] {
testIgnoreMissingFiles()
}
assert(exception.getMessage().contains("does not exist"))
}
}

/**
* this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
* to increase the chance of failure
Expand Down

0 comments on commit 522e0b1

Please sign in to comment.