Skip to content

Commit

Permalink
Added more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Dec 20, 2019
1 parent a441604 commit 5c11b94
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
sparkSession.sessionState.catalogManager,
dsOptions)
catalog.loadTable(ident)
case other =>
case _ =>
userSpecifiedSchema match {
case Some(schema) => provider.getTable(dsOptions, schema)
case _ => provider.getTable(dsOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
import testImplicits._

private val catalogName = "testcat"
private val format = classOf[CatalogSupportingInMemoryTableProvider].getName

private def catalog(name: String): InMemoryTableSessionCatalog = {
spark.sessionState.catalogManager.catalog(name).asInstanceOf[InMemoryTableSessionCatalog]
Expand All @@ -59,40 +60,48 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
spark.conf.unset(s"spark.sql.catalog.$catalogName")
}

def dataFrameWriterTests(withCatalogOption: Option[String]): Unit = {
def testWithDifferentCatalogs(withCatalogOption: Option[String]): Unit = {
Seq(SaveMode.ErrorIfExists, SaveMode.Ignore).foreach { saveMode =>
test(s"save works with $saveMode - no table, no partitioning, session catalog") {
val df = spark.range(10)
val dfw = df.write.mode(saveMode).option("name", "t1")
val dfw = df.write.format(format).mode(saveMode).option("name", "t1")
withCatalogOption.foreach(cName => dfw.option("catalog", cName))
dfw.save()

val table = catalog(SESSION_CATALOG_NAME).loadTable("t1")
val table = catalog(withCatalogOption.getOrElse(SESSION_CATALOG_NAME)).loadTable("t1")
assert(table.name() === "t1", "Table identifier was wrong")
assert(table.partitioning().isEmpty, "Partitioning should be empty")
assert(table.schema() === df.schema.asNullable, "Schema did not match")

val dfr = spark.read.format(format).option("name", "t1")
withCatalogOption.foreach(cName => dfr.option("catalog", cName))
checkAnswer(dfr.load(), df.toDF())
}

test(s"save works with $saveMode - no table, with partitioning, session catalog") {
val df = spark.range(10).withColumn("part", 'id % 5)
val dfw = df.write.mode(saveMode).option("name", "t1").partitionBy("part")
val dfw = df.write.format(format).mode(saveMode).option("name", "t1").partitionBy("part")
withCatalogOption.foreach(cName => dfw.option("catalog", cName))
dfw.save()

val table = catalog(SESSION_CATALOG_NAME).loadTable("t1")
val table = catalog(withCatalogOption.getOrElse(SESSION_CATALOG_NAME)).loadTable("t1")
assert(table.name() === "t1", "Table identifier was wrong")
assert(table.partitioning().length === 1, "Partitioning should not be empty")
assert(table.partitioning().head.references().head.fieldNames().head === "part",
"Partitioning was incorrect")
assert(table.schema() === df.schema.asNullable, "Schema did not match")

val dfr = spark.read.format(format).option("name", "t1")
withCatalogOption.foreach(cName => dfr.option("catalog", cName))
checkAnswer(dfr.load(), df.toDF())
}
}

test("save fails with ErrorIfExists if table exists") {
sql("create table t1 (id bigint) using foo")
val df = spark.range(10)
intercept[TableAlreadyExistsException] {
val dfw = df.write.option("name", "t1")
val dfw = df.write.format(format).option("name", "t1")
withCatalogOption.foreach(cName => dfw.option("catalog", cName))
dfw.save()
}
Expand All @@ -102,7 +111,7 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
sql("create table t1 (id bigint) using foo")
val df = spark.range(10).withColumn("part", 'id % 5)
intercept[TableAlreadyExistsException] {
val dfw = df.write.mode(SaveMode.Ignore).option("name", "t1")
val dfw = df.write.format(format).mode(SaveMode.Ignore).option("name", "t1")
withCatalogOption.foreach(cName => dfw.option("catalog", cName))
dfw.save()
}
Expand All @@ -113,9 +122,9 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
}
}

dataFrameWriterTests(None)
testWithDifferentCatalogs(None)

dataFrameWriterTests(Some(catalogName))
testWithDifferentCatalogs(Some(catalogName))
}

class CatalogSupportingInMemoryTableProvider
Expand Down

0 comments on commit 5c11b94

Please sign in to comment.