From 7f0ba76a5e9d4604b2f586b1e1bc512f7675115b Mon Sep 17 00:00:00 2001 From: GuoPhilipse Date: Thu, 21 May 2020 08:53:19 +0800 Subject: [PATCH] fix-code-style --- .../apache/spark/sql/internal/SQLConf.scala | 145 +++++++++--------- .../sql/catalyst/expressions/CastSuite.scala | 102 ++---------- 2 files changed, 86 insertions(+), 161 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 531e6801a8ac5..b60dbe869f573 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -621,7 +621,7 @@ object SQLConf { .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(ParquetOutputTimestampType.values.map(_.toString)) - .createWithDefault(ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) + .createWithDefault(ParquetOutputTimestampType.INT96.toString) val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec") .doc("Sets the compression codec used when writing Parquet files. If either `compression` or " + @@ -845,8 +845,10 @@ object SQLConf { .doc("When true, enable the metadata-only query optimization that use the table's metadata " + "to produce the partition columns instead of table scans. It applies when all the columns " + "scanned are partition columns and the query has an aggregate operator that satisfies " + - "distinct semantics. By default the optimization is disabled, since it may return " + - "incorrect results when the files are empty.") + "distinct semantics. By default the optimization is disabled, and deprecated as of Spark " + + "3.0 since it may return incorrect results when the files are empty, see also SPARK-26709." + + "It will be removed in the future releases. If you must use, use 'SparkSessionExtensions' " + + "instead to inject it as a custom rule.") .version("2.1.1") .booleanConf .createWithDefault(false) @@ -2063,16 +2065,18 @@ object SQLConf { .booleanConf .createWithDefault(true) - val NESTED_PREDICATE_PUSHDOWN_ENABLED = - buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled") + val NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST = + buildConf("spark.sql.optimizer.nestedPredicatePushdown.supportedFileSources") .internal() - .doc("When true, Spark tries to push down predicates for nested columns and or names " + - "containing `dots` to data sources. Currently, Parquet implements both optimizations " + - "while ORC only supports predicates for names containing `dots`. The other data sources" + - "don't support this feature yet.") + .doc("A comma-separated list of data source short names or fully qualified data source " + + "implementation class names for which Spark tries to push down predicates for nested " + + "columns and/or names containing `dots` to data sources. This configuration is only " + + "effective with file-based data source in DSv1. Currently, Parquet implements " + + "both optimizations while ORC only supports predicates for names containing `dots`. The " + + "other data sources don't support this feature yet. So the default value is 'parquet,orc'.") .version("3.0.0") - .booleanConf - .createWithDefault(true) + .stringConf + .createWithDefault("parquet,orc") val SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED = buildConf("spark.sql.optimizer.serializer.nestedSchemaPruning.enabled") @@ -2224,15 +2228,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED = - buildConf("spark.sql.legacy.createHiveTableByDefault.enabled") - .internal() - .doc("When set to true, CREATE TABLE syntax without a provider will use hive " + - s"instead of the value of ${DEFAULT_DATA_SOURCE_NAME.key}.") - .version("3.0.0") - .booleanConf - .createWithDefault(false) - val LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING = buildConf("spark.sql.legacy.bucketedTableScan.outputOrdering") .internal() @@ -2524,61 +2519,75 @@ object SQLConf { .booleanConf .createWithDefault(false) - val LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE = - buildConf("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled") + val LEGACY_PARQUET_REBASE_MODE_IN_WRITE = + buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInWrite") .internal() - .doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " + - "to the hybrid calendar (Julian + Gregorian) in write. " + - "The rebasing is performed by converting micros/millis/days to " + - "a local date/timestamp in the source calendar, interpreting the resulted date/" + - "timestamp in the target calendar, and getting the number of micros/millis/days " + - "since the epoch 1970-01-01 00:00:00Z.") + .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " + + "to the legacy hybrid (Julian + Gregorian) calendar when writing Parquet files. " + + "When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " + + "When EXCEPTION, which is the default, Spark will fail the writing if it sees " + + "ancient dates/timestamps that are ambiguous between the two calendars.") .version("3.0.0") - .booleanConf - .createWithDefault(false) - - val LEGACY_PARQUET_REBASE_DATETIME_IN_READ = - buildConf("spark.sql.legacy.parquet.rebaseDateTimeInRead.enabled") - .internal() - .doc("When true, rebase dates/timestamps " + - "from the hybrid calendar to Proleptic Gregorian calendar in read. " + - "The rebasing is performed by converting micros/millis/days to " + - "a local date/timestamp in the source calendar, interpreting the resulted date/" + - "timestamp in the target calendar, and getting the number of micros/millis/days " + - "since the epoch 1970-01-01 00:00:00Z.") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + + val LEGACY_PARQUET_REBASE_MODE_IN_READ = + buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead") + .internal() + .doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " + + "Gregorian) calendar to Proleptic Gregorian calendar when reading Parquet files. " + + "When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " + + "When EXCEPTION, which is the default, Spark will fail the reading if it sees " + + "ancient dates/timestamps that are ambiguous between the two calendars. This config is " + + "only effective if the writer info (like Spark, Hive) of the Parquet files is unknown.") .version("3.0.0") - .booleanConf - .createWithDefault(false) + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) - val LEGACY_AVRO_REBASE_DATETIME_IN_WRITE = - buildConf("spark.sql.legacy.avro.rebaseDateTimeInWrite.enabled") + val LEGACY_AVRO_REBASE_MODE_IN_WRITE = + buildConf("spark.sql.legacy.avro.datetimeRebaseModeInWrite") .internal() - .doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " + - "to the hybrid calendar (Julian + Gregorian) in write. " + - "The rebasing is performed by converting micros/millis/days to " + - "a local date/timestamp in the source calendar, interpreting the resulted date/" + - "timestamp in the target calendar, and getting the number of micros/millis/days " + - "since the epoch 1970-01-01 00:00:00Z.") + .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " + + "to the legacy hybrid (Julian + Gregorian) calendar when writing Avro files. " + + "When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " + + "When EXCEPTION, which is the default, Spark will fail the writing if it sees " + + "ancient dates/timestamps that are ambiguous between the two calendars.") .version("3.0.0") - .booleanConf - .createWithDefault(false) + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + + val LEGACY_AVRO_REBASE_MODE_IN_READ = + buildConf("spark.sql.legacy.avro.datetimeRebaseModeInRead") + .internal() + .doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " + + "Gregorian) calendar to Proleptic Gregorian calendar when reading Avro files. " + + "When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " + + "When EXCEPTION, which is the default, Spark will fail the reading if it sees " + + "ancient dates/timestamps that are ambiguous between the two calendars. This config is " + + "only effective if the writer info (like Spark, Hive) of the Avro files is unknown.") + .version("3.0.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) - val LEGACY_AVRO_REBASE_DATETIME_IN_READ = - buildConf("spark.sql.legacy.avro.rebaseDateTimeInRead.enabled") + val SCRIPT_TRANSFORMATION_EXIT_TIMEOUT = + buildConf("spark.sql.scriptTransformation.exitTimeoutInSeconds") .internal() - .doc("When true, rebase dates/timestamps " + - "from the hybrid calendar to Proleptic Gregorian calendar in read. " + - "The rebasing is performed by converting micros/millis/days to " + - "a local date/timestamp in the source calendar, interpreting the resulted date/" + - "timestamp in the target calendar, and getting the number of micros/millis/days " + - "since the epoch 1970-01-01 00:00:00Z.") + .doc("Timeout for executor to wait for the termination of transformation script when EOF.") .version("3.0.0") - .booleanConf - .createWithDefault(false) + .timeConf(TimeUnit.SECONDS) + .checkValue(_ > 0, "The timeout value must be positive") + .createWithDefault(10L) val LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE = buildConf("spark.sql.legacy.numericConvertToTimestampEnable") - .internal() .doc("when true,use legacy numberic can convert to timestamp") .version("3.0.0") .booleanConf @@ -2622,7 +2631,10 @@ object SQLConf { DeprecatedConfig(ARROW_FALLBACK_ENABLED.key, "3.0", s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it."), DeprecatedConfig(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "3.0", - s"Use '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' instead of it.") + s"Use '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' instead of it."), + DeprecatedConfig(OPTIMIZER_METADATA_ONLY.key, "3.0", + "Avoid to depend on this optimization to prevent a potential correctness issue. " + + "If you must use, use 'SparkSessionExtensions' instead to inject it as a custom rule.") ) Map(configs.map { cfg => cfg.key -> cfg } : _*) @@ -3115,8 +3127,6 @@ class SQLConf extends Serializable with Logging { def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED) - def nestedPredicatePushdownEnabled: Boolean = getConf(NESTED_PREDICATE_PUSHDOWN_ENABLED) - def serializerNestedSchemaPruningEnabled: Boolean = getConf(SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED) @@ -3150,9 +3160,6 @@ class SQLConf extends Serializable with Logging { def allowNegativeScaleOfDecimalEnabled: Boolean = getConf(SQLConf.LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED) - def createHiveTableByDefaultEnabled: Boolean = - getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED) - def truncateTableIgnorePermissionAcl: Boolean = getConf(SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index 362703047215d..5c4779e53f7b5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -1300,97 +1300,15 @@ class CastSuite extends CastSuiteBase { } } - test("SPARK-31710:Add legacy when casting long to timestamp") { - withSQLConf( - SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE.key -> "true", - SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_IN_SECONDS.key -> "false") { - def checkLongToTimestamp(l: Long, expected: Long): Unit = { - checkEvaluation(cast(l, TimestampType), expected) - } - checkLongToTimestamp(253402272000L, 253402272000000L) - checkLongToTimestamp(-5L, -5000L) - checkLongToTimestamp(1L, 1000L) - checkLongToTimestamp(0L, 0L) - checkLongToTimestamp(123L, 123000L) - } - withSQLConf( - SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE.key -> "true", - SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_IN_SECONDS.key -> "true") { - def checkLongToTimestamp(l: Long, expected: Long): Unit = { - checkEvaluation(cast(l, TimestampType), expected) - } - checkLongToTimestamp(253402272000L, 253402272000000000L) - checkLongToTimestamp(-5L, -5000000L) - checkLongToTimestamp(1L, 1000000L) - checkLongToTimestamp(0L, 0L) - checkLongToTimestamp(123L, 123000000L) - } - - withSQLConf( - SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE.key -> "false", - SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_IN_SECONDS.key -> "false") { - def checkByteToTimestamp(b: Byte, expected: Long): Unit = { - assert(!cast(b, TimestampType).resolved) - } - def checkShortToTimestamp(s: Short, expected: Long): Unit = { - assert(!cast(s, TimestampType).resolved) - } - def checkIntToTimestamp(str: Int, expected: Long): Unit = { - assert(!cast(str, TimestampType).resolved) - } - def checkLongToTimestamp(l: Long, expected: Long): Unit = { - assert(!cast(l, TimestampType).resolved) - } - def checkDecimalToTimestamp(d: Decimal, expected: Long): Unit = { - assert(!cast(d, TimestampType).resolved) - } - def checkFloatToTimestamp(f: Float, expected: Long): Unit = { - assert(!cast(f, TimestampType).resolved) - } - def checkDoubleToTimestamp(d: Double, expected: Long): Unit = { - assert(!cast(d, TimestampType).resolved) - } - checkByteToTimestamp(1.toByte, 0L) - checkShortToTimestamp(1.toShort, 0L) - checkIntToTimestamp(1, 0L) - checkLongToTimestamp(1L, 0L) - checkDecimalToTimestamp(Decimal(1.5), 0L) - checkFloatToTimestamp(1.5f, 0L) - checkDoubleToTimestamp(2.1D, 0L) - } - - withSQLConf( - SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE.key -> "false", - SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_IN_SECONDS.key -> "true") { - def checkByteToTimestamp(b: Byte, expected: Long): Unit = { - assert(!cast(b, TimestampType).resolved) - } - def checkShortToTimestamp(s: Short, expected: Long): Unit = { - assert(!cast(s, TimestampType).resolved) - } - def checkIntToTimestamp(str: Int, expected: Long): Unit = { - assert(!cast(str, TimestampType).resolved) - } - def checkLongToTimestamp(l: Long, expected: Long): Unit = { - assert(!cast(l, TimestampType).resolved) - } - def checkDecimalToTimestamp(d: Decimal, expected: Long): Unit = { - assert(!cast(d, TimestampType).resolved) - } - def checkFloatToTimestamp(f: Float, expected: Long): Unit = { - assert(!cast(f, TimestampType).resolved) - } - def checkDoubleToTimestamp(d: Double, expected: Long): Unit = { - assert(!cast(d, TimestampType).resolved) - } - - checkByteToTimestamp(1.toByte, 0L) - checkShortToTimestamp(1.toShort, 0L) - checkIntToTimestamp(1, 0L) - checkLongToTimestamp(1L, 0L) - checkDecimalToTimestamp(Decimal(1.5), 0L) - checkFloatToTimestamp(1.5f, 0L) - checkDoubleToTimestamp(2.1D, 0L) + test("cast a timestamp before the epoch 1970-01-01 00:00:00Z") { + withDefaultTimeZone(UTC) { + val negativeTs = Timestamp.valueOf("1900-05-05 18:34:56.1") + assert(negativeTs.getTime < 0) + val expectedSecs = Math.floorDiv(negativeTs.getTime, MILLIS_PER_SECOND) + checkEvaluation(cast(negativeTs, ByteType), expectedSecs.toByte) + checkEvaluation(cast(negativeTs, ShortType), expectedSecs.toShort) + checkEvaluation(cast(negativeTs, IntegerType), expectedSecs.toInt) + checkEvaluation(cast(negativeTs, LongType), expectedSecs) } } @@ -1492,7 +1410,7 @@ class CastSuite extends CastSuiteBase { checkFloatToTimestamp(1.5f, 0L) checkDoubleToTimestamp(2.1D, 0L) } - } + } } /**