Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed May 3, 2017
1 parent fc17a2e commit 2537437
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
import org.apache.spark.sql.types._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import java.util.TimeZone
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -207,10 +206,10 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
val defaultTz = None
// check that created tables have correct TBLPROPERTIES
val tblProperties = explicitTz.map {
tz => raw"""TBLPROPERTIES ($key="$tz")"""
tz => s"""TBLPROPERTIES ($key="$tz")"""
}.getOrElse("")
spark.sql(
raw"""CREATE TABLE $baseTable (
s"""CREATE TABLE $baseTable (
| x int
| )
| STORED AS PARQUET
Expand All @@ -219,7 +218,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
val expectedTableTz = explicitTz.orElse(defaultTz)
checkHasTz(spark, baseTable, expectedTableTz)
spark.sql(
raw"""CREATE TABLE partitioned_$baseTable (
s"""CREATE TABLE partitioned_$baseTable (
| x int
| )
| PARTITIONED BY (y int)
Expand All @@ -230,7 +229,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
spark.sql(s"CREATE TABLE like_$baseTable LIKE $baseTable")
checkHasTz(spark, s"like_$baseTable", expectedTableTz)
spark.sql(
raw"""CREATE TABLE select_$baseTable
s"""CREATE TABLE select_$baseTable
| STORED AS PARQUET
| AS
| SELECT * from $baseTable
Expand All @@ -239,14 +238,14 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi

// check alter table, setting, unsetting, resetting the property
spark.sql(
raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="America/Los_Angeles")""")
s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="America/Los_Angeles")""")
checkHasTz(spark, baseTable, Some("America/Los_Angeles"))
spark.sql(raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""")
spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""")
checkHasTz(spark, baseTable, Some("UTC"))
spark.sql(raw"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""")
spark.sql(s"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""")
checkHasTz(spark, baseTable, None)
explicitTz.foreach { tz =>
spark.sql(raw"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="$tz")""")
spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="$tz")""")
checkHasTz(spark, baseTable, expectedTableTz)
}
}
Expand All @@ -267,16 +266,13 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
val timestampTimezoneToMillis = {
val originalTz = TimeZone.getDefault
try {
(for {
timestampString <- desiredTimestampStrings
timezone <- Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map {
TimeZone.getTimeZone(_)
desiredTimestampStrings.flatMap { timestampString =>
Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map { tzId =>
TimeZone.setDefault(TimeZone.getTimeZone(tzId))
val timestamp = Timestamp.valueOf(timestampString)
(timestampString, tzId) -> timestamp.getTime()
}
} yield {
TimeZone.setDefault(timezone)
val timestamp = Timestamp.valueOf(timestampString)
(timestampString, timezone.getID()) -> timestamp.getTime()
}).toMap
}.toMap
} finally {
TimeZone.setDefault(originalTz)
}
Expand All @@ -303,18 +299,17 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
val sessionTzId = sessionTzOpt.getOrElse(TimeZone.getDefault().getID())
// check that created tables have correct TBLPROPERTIES
val tblProperties = explicitTz.map {
tz => raw"""TBLPROPERTIES ($key="$tz")"""
tz => s"""TBLPROPERTIES ($key="$tz")"""
}.getOrElse("")


val rawData = createRawData(spark)
// Check writing data out.
// We write data into our tables, and then check the raw parquet files to see whether
// the correct conversion was applied.
rawData.write.saveAsTable(s"saveAsTable_$baseTable")
checkHasTz(spark, s"saveAsTable_$baseTable", None)
spark.sql(
raw"""CREATE TABLE insert_$baseTable (
s"""CREATE TABLE insert_$baseTable (
| display string,
| ts timestamp
| )
Expand Down Expand Up @@ -370,7 +365,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
baseTable: String,
explicitTz: Option[String],
sessionTzOpt: Option[String]): Unit = {
val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
test(s"SPARK-12297: Read from Parquet tables with Timestamps; explicitTz = $explicitTz; " +
s"sessionTzOpt = $sessionTzOpt") {
withTable(s"external_$baseTable", s"partitioned_$baseTable") {
Expand Down Expand Up @@ -412,8 +407,8 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
.parquet(partitionedPath.getCanonicalPath)
// unfortunately, catalog.createTable() doesn't let us specify partitioning, so just use
// a "CREATE TABLE" stmt.
val tblOpts = explicitTz.map { tz => raw"""TBLPROPERTIES ($key="$tz")""" }.getOrElse("")
spark.sql(raw"""CREATE EXTERNAL TABLE partitioned_$baseTable (
val tblOpts = explicitTz.map { tz => s"""TBLPROPERTIES ($key="$tz")""" }.getOrElse("")
spark.sql(s"""CREATE EXTERNAL TABLE partitioned_$baseTable (
| display string,
| ts timestamp
|)
Expand Down Expand Up @@ -512,7 +507,7 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
val badTzException = intercept[AnalysisException] {
spark.sql(
raw"""CREATE TABLE bad_tz_table (
s"""CREATE TABLE bad_tz_table (
| x int
| )
| STORED AS PARQUET
Expand Down

0 comments on commit 2537437

Please sign in to comment.