Skip to content

Commit

Permalink
fix-code-style
Browse files Browse the repository at this point in the history
  • Loading branch information
GuoPhilipse committed May 21, 2020
1 parent a39067d commit 7f0ba76
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ object SQLConf {
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(ParquetOutputTimestampType.values.map(_.toString))
.createWithDefault(ParquetOutputTimestampType.TIMESTAMP_MICROS.toString)
.createWithDefault(ParquetOutputTimestampType.INT96.toString)

val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
.doc("Sets the compression codec used when writing Parquet files. If either `compression` or " +
Expand Down Expand Up @@ -845,8 +845,10 @@ object SQLConf {
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
"scanned are partition columns and the query has an aggregate operator that satisfies " +
"distinct semantics. By default the optimization is disabled, since it may return " +
"incorrect results when the files are empty.")
"distinct semantics. By default the optimization is disabled, and deprecated as of Spark " +
"3.0 since it may return incorrect results when the files are empty, see also SPARK-26709." +
"It will be removed in the future releases. If you must use, use 'SparkSessionExtensions' " +
"instead to inject it as a custom rule.")
.version("2.1.1")
.booleanConf
.createWithDefault(false)
Expand Down Expand Up @@ -2063,16 +2065,18 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val NESTED_PREDICATE_PUSHDOWN_ENABLED =
buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled")
val NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST =
buildConf("spark.sql.optimizer.nestedPredicatePushdown.supportedFileSources")
.internal()
.doc("When true, Spark tries to push down predicates for nested columns and or names " +
"containing `dots` to data sources. Currently, Parquet implements both optimizations " +
"while ORC only supports predicates for names containing `dots`. The other data sources" +
"don't support this feature yet.")
.doc("A comma-separated list of data source short names or fully qualified data source " +
"implementation class names for which Spark tries to push down predicates for nested " +
"columns and/or names containing `dots` to data sources. This configuration is only " +
"effective with file-based data source in DSv1. Currently, Parquet implements " +
"both optimizations while ORC only supports predicates for names containing `dots`. The " +
"other data sources don't support this feature yet. So the default value is 'parquet,orc'.")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
.stringConf
.createWithDefault("parquet,orc")

val SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED =
buildConf("spark.sql.optimizer.serializer.nestedSchemaPruning.enabled")
Expand Down Expand Up @@ -2224,15 +2228,6 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED =
buildConf("spark.sql.legacy.createHiveTableByDefault.enabled")
.internal()
.doc("When set to true, CREATE TABLE syntax without a provider will use hive " +
s"instead of the value of ${DEFAULT_DATA_SOURCE_NAME.key}.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)

val LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING =
buildConf("spark.sql.legacy.bucketedTableScan.outputOrdering")
.internal()
Expand Down Expand Up @@ -2524,61 +2519,75 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE =
buildConf("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled")
val LEGACY_PARQUET_REBASE_MODE_IN_WRITE =
buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInWrite")
.internal()
.doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
"to the hybrid calendar (Julian + Gregorian) in write. " +
"The rebasing is performed by converting micros/millis/days to " +
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
"timestamp in the target calendar, and getting the number of micros/millis/days " +
"since the epoch 1970-01-01 00:00:00Z.")
.doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " +
"to the legacy hybrid (Julian + Gregorian) calendar when writing Parquet files. " +
"When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " +
"When EXCEPTION, which is the default, Spark will fail the writing if it sees " +
"ancient dates/timestamps that are ambiguous between the two calendars.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)

val LEGACY_PARQUET_REBASE_DATETIME_IN_READ =
buildConf("spark.sql.legacy.parquet.rebaseDateTimeInRead.enabled")
.internal()
.doc("When true, rebase dates/timestamps " +
"from the hybrid calendar to Proleptic Gregorian calendar in read. " +
"The rebasing is performed by converting micros/millis/days to " +
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
"timestamp in the target calendar, and getting the number of micros/millis/days " +
"since the epoch 1970-01-01 00:00:00Z.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)

val LEGACY_PARQUET_REBASE_MODE_IN_READ =
buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead")
.internal()
.doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " +
"Gregorian) calendar to Proleptic Gregorian calendar when reading Parquet files. " +
"When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " +
"When EXCEPTION, which is the default, Spark will fail the reading if it sees " +
"ancient dates/timestamps that are ambiguous between the two calendars. This config is " +
"only effective if the writer info (like Spark, Hive) of the Parquet files is unknown.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)

val LEGACY_AVRO_REBASE_DATETIME_IN_WRITE =
buildConf("spark.sql.legacy.avro.rebaseDateTimeInWrite.enabled")
val LEGACY_AVRO_REBASE_MODE_IN_WRITE =
buildConf("spark.sql.legacy.avro.datetimeRebaseModeInWrite")
.internal()
.doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
"to the hybrid calendar (Julian + Gregorian) in write. " +
"The rebasing is performed by converting micros/millis/days to " +
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
"timestamp in the target calendar, and getting the number of micros/millis/days " +
"since the epoch 1970-01-01 00:00:00Z.")
.doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " +
"to the legacy hybrid (Julian + Gregorian) calendar when writing Avro files. " +
"When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " +
"When EXCEPTION, which is the default, Spark will fail the writing if it sees " +
"ancient dates/timestamps that are ambiguous between the two calendars.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)

val LEGACY_AVRO_REBASE_MODE_IN_READ =
buildConf("spark.sql.legacy.avro.datetimeRebaseModeInRead")
.internal()
.doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " +
"Gregorian) calendar to Proleptic Gregorian calendar when reading Avro files. " +
"When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " +
"When EXCEPTION, which is the default, Spark will fail the reading if it sees " +
"ancient dates/timestamps that are ambiguous between the two calendars. This config is " +
"only effective if the writer info (like Spark, Hive) of the Avro files is unknown.")
.version("3.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)

val LEGACY_AVRO_REBASE_DATETIME_IN_READ =
buildConf("spark.sql.legacy.avro.rebaseDateTimeInRead.enabled")
val SCRIPT_TRANSFORMATION_EXIT_TIMEOUT =
buildConf("spark.sql.scriptTransformation.exitTimeoutInSeconds")
.internal()
.doc("When true, rebase dates/timestamps " +
"from the hybrid calendar to Proleptic Gregorian calendar in read. " +
"The rebasing is performed by converting micros/millis/days to " +
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
"timestamp in the target calendar, and getting the number of micros/millis/days " +
"since the epoch 1970-01-01 00:00:00Z.")
.doc("Timeout for executor to wait for the termination of transformation script when EOF.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)
.timeConf(TimeUnit.SECONDS)
.checkValue(_ > 0, "The timeout value must be positive")
.createWithDefault(10L)

val LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE =
buildConf("spark.sql.legacy.numericConvertToTimestampEnable")
.internal()
.doc("when true,use legacy numberic can convert to timestamp")
.version("3.0.0")
.booleanConf
Expand Down Expand Up @@ -2622,7 +2631,10 @@ object SQLConf {
DeprecatedConfig(ARROW_FALLBACK_ENABLED.key, "3.0",
s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it."),
DeprecatedConfig(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "3.0",
s"Use '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' instead of it.")
s"Use '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' instead of it."),
DeprecatedConfig(OPTIMIZER_METADATA_ONLY.key, "3.0",
"Avoid to depend on this optimization to prevent a potential correctness issue. " +
"If you must use, use 'SparkSessionExtensions' instead to inject it as a custom rule.")
)

Map(configs.map { cfg => cfg.key -> cfg } : _*)
Expand Down Expand Up @@ -3115,8 +3127,6 @@ class SQLConf extends Serializable with Logging {

def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED)

def nestedPredicatePushdownEnabled: Boolean = getConf(NESTED_PREDICATE_PUSHDOWN_ENABLED)

def serializerNestedSchemaPruningEnabled: Boolean =
getConf(SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED)

Expand Down Expand Up @@ -3150,9 +3160,6 @@ class SQLConf extends Serializable with Logging {
def allowNegativeScaleOfDecimalEnabled: Boolean =
getConf(SQLConf.LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED)

def createHiveTableByDefaultEnabled: Boolean =
getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED)

def truncateTableIgnorePermissionAcl: Boolean =
getConf(SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1300,97 +1300,15 @@ class CastSuite extends CastSuiteBase {
}
}

test("SPARK-31710:Add legacy when casting long to timestamp") {
withSQLConf(
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE.key -> "true",
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_IN_SECONDS.key -> "false") {
def checkLongToTimestamp(l: Long, expected: Long): Unit = {
checkEvaluation(cast(l, TimestampType), expected)
}
checkLongToTimestamp(253402272000L, 253402272000000L)
checkLongToTimestamp(-5L, -5000L)
checkLongToTimestamp(1L, 1000L)
checkLongToTimestamp(0L, 0L)
checkLongToTimestamp(123L, 123000L)
}
withSQLConf(
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE.key -> "true",
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_IN_SECONDS.key -> "true") {
def checkLongToTimestamp(l: Long, expected: Long): Unit = {
checkEvaluation(cast(l, TimestampType), expected)
}
checkLongToTimestamp(253402272000L, 253402272000000000L)
checkLongToTimestamp(-5L, -5000000L)
checkLongToTimestamp(1L, 1000000L)
checkLongToTimestamp(0L, 0L)
checkLongToTimestamp(123L, 123000000L)
}

withSQLConf(
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE.key -> "false",
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_IN_SECONDS.key -> "false") {
def checkByteToTimestamp(b: Byte, expected: Long): Unit = {
assert(!cast(b, TimestampType).resolved)
}
def checkShortToTimestamp(s: Short, expected: Long): Unit = {
assert(!cast(s, TimestampType).resolved)
}
def checkIntToTimestamp(str: Int, expected: Long): Unit = {
assert(!cast(str, TimestampType).resolved)
}
def checkLongToTimestamp(l: Long, expected: Long): Unit = {
assert(!cast(l, TimestampType).resolved)
}
def checkDecimalToTimestamp(d: Decimal, expected: Long): Unit = {
assert(!cast(d, TimestampType).resolved)
}
def checkFloatToTimestamp(f: Float, expected: Long): Unit = {
assert(!cast(f, TimestampType).resolved)
}
def checkDoubleToTimestamp(d: Double, expected: Long): Unit = {
assert(!cast(d, TimestampType).resolved)
}
checkByteToTimestamp(1.toByte, 0L)
checkShortToTimestamp(1.toShort, 0L)
checkIntToTimestamp(1, 0L)
checkLongToTimestamp(1L, 0L)
checkDecimalToTimestamp(Decimal(1.5), 0L)
checkFloatToTimestamp(1.5f, 0L)
checkDoubleToTimestamp(2.1D, 0L)
}

withSQLConf(
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE.key -> "false",
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_IN_SECONDS.key -> "true") {
def checkByteToTimestamp(b: Byte, expected: Long): Unit = {
assert(!cast(b, TimestampType).resolved)
}
def checkShortToTimestamp(s: Short, expected: Long): Unit = {
assert(!cast(s, TimestampType).resolved)
}
def checkIntToTimestamp(str: Int, expected: Long): Unit = {
assert(!cast(str, TimestampType).resolved)
}
def checkLongToTimestamp(l: Long, expected: Long): Unit = {
assert(!cast(l, TimestampType).resolved)
}
def checkDecimalToTimestamp(d: Decimal, expected: Long): Unit = {
assert(!cast(d, TimestampType).resolved)
}
def checkFloatToTimestamp(f: Float, expected: Long): Unit = {
assert(!cast(f, TimestampType).resolved)
}
def checkDoubleToTimestamp(d: Double, expected: Long): Unit = {
assert(!cast(d, TimestampType).resolved)
}

checkByteToTimestamp(1.toByte, 0L)
checkShortToTimestamp(1.toShort, 0L)
checkIntToTimestamp(1, 0L)
checkLongToTimestamp(1L, 0L)
checkDecimalToTimestamp(Decimal(1.5), 0L)
checkFloatToTimestamp(1.5f, 0L)
checkDoubleToTimestamp(2.1D, 0L)
test("cast a timestamp before the epoch 1970-01-01 00:00:00Z") {
withDefaultTimeZone(UTC) {
val negativeTs = Timestamp.valueOf("1900-05-05 18:34:56.1")
assert(negativeTs.getTime < 0)
val expectedSecs = Math.floorDiv(negativeTs.getTime, MILLIS_PER_SECOND)
checkEvaluation(cast(negativeTs, ByteType), expectedSecs.toByte)
checkEvaluation(cast(negativeTs, ShortType), expectedSecs.toShort)
checkEvaluation(cast(negativeTs, IntegerType), expectedSecs.toInt)
checkEvaluation(cast(negativeTs, LongType), expectedSecs)
}
}

Expand Down Expand Up @@ -1492,7 +1410,7 @@ class CastSuite extends CastSuiteBase {
checkFloatToTimestamp(1.5f, 0L)
checkDoubleToTimestamp(2.1D, 0L)
}
}
}
}

/**
Expand Down

0 comments on commit 7f0ba76

Please sign in to comment.