Skip to content

Commit

Permalink
fix: disable checking for uint_8 and uint_16 if complex type readers …
Browse files Browse the repository at this point in the history
…are enabled (#1376)
  • Loading branch information
parthchandra authored Feb 13, 2025
1 parent d885f4a commit 57a4dca
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 81 deletions.
8 changes: 8 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,14 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_SCAN_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.scan.allowIncompatible")
.doc(
"Comet is not currently fully compatible with Spark for all datatypes. " +
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
.booleanConf
.createWithDefault(true)

val COMET_EXPR_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.expression.allowIncompatible")
.doc(
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Comet provides the following configuration settings.
| spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true |
| spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 |
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
| spark.comet.scan.allowIncompatible | Comet is not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | true |
| spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true |
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. | false |
| spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,15 @@ object CometSparkSessionExtensions extends Logging {
org.apache.spark.SPARK_VERSION >= "4.0"
}

def isComplexTypeReaderEnabled(conf: SQLConf): Boolean = {
CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_DATAFUSION
}

def usingDataFusionParquetReader(conf: SQLConf): Boolean = {
isComplexTypeReaderEnabled(conf) && !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf)
}

/** Calculates required memory overhead in MB per executor process for Comet. */
def getCometMemoryOverheadInMiB(sparkConf: SparkConf): Long = {
// `spark.executor.memory` default value is 1g
Expand Down
6 changes: 5 additions & 1 deletion spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.comet

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

trait DataTypeSupport {
Expand All @@ -35,12 +36,15 @@ trait DataTypeSupport {
def isAdditionallySupported(dt: DataType): Boolean = false

private def isGloballySupported(dt: DataType): Boolean = dt match {
case ByteType | ShortType
if CometSparkSessionExtensions.isComplexTypeReaderEnabled(SQLConf.get) &&
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
false
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
BinaryType | StringType | _: DecimalType | DateType | TimestampType =>
true
case t: DataType if t.typeName == "timestamp_ntz" =>
true
true
case _ => false
}

Expand Down
127 changes: 103 additions & 24 deletions spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {

private val timestampPattern = "0123456789/:T" + whitespaceChars

lazy val usingDataFusionParquetReader: Boolean =
CometSparkSessionExtensions.usingDataFusionParquetReader(conf)

test("all valid cast combinations covered") {
val names = testNames

Expand Down Expand Up @@ -145,88 +148,148 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// CAST from ByteType

test("cast ByteType to BooleanType") {
castTest(generateBytes(), DataTypes.BooleanType)
castTest(
generateBytes(),
DataTypes.BooleanType,
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ByteType to ShortType") {
castTest(generateBytes(), DataTypes.ShortType)
castTest(
generateBytes(),
DataTypes.ShortType,
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ByteType to IntegerType") {
castTest(generateBytes(), DataTypes.IntegerType)
castTest(
generateBytes(),
DataTypes.IntegerType,
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ByteType to LongType") {
castTest(generateBytes(), DataTypes.LongType)
castTest(
generateBytes(),
DataTypes.LongType,
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ByteType to FloatType") {
castTest(generateBytes(), DataTypes.FloatType)
castTest(
generateBytes(),
DataTypes.FloatType,
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ByteType to DoubleType") {
castTest(generateBytes(), DataTypes.DoubleType)
castTest(
generateBytes(),
DataTypes.DoubleType,
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ByteType to DecimalType(10,2)") {
castTest(generateBytes(), DataTypes.createDecimalType(10, 2))
castTest(
generateBytes(),
DataTypes.createDecimalType(10, 2),
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ByteType to StringType") {
castTest(generateBytes(), DataTypes.StringType)
castTest(
generateBytes(),
DataTypes.StringType,
hasIncompatibleType = usingDataFusionParquetReader)
}

ignore("cast ByteType to BinaryType") {
castTest(generateBytes(), DataTypes.BinaryType)
castTest(
generateBytes(),
DataTypes.BinaryType,
hasIncompatibleType = usingDataFusionParquetReader)
}

ignore("cast ByteType to TimestampType") {
// input: -1, expected: 1969-12-31 15:59:59.0, actual: 1969-12-31 15:59:59.999999
castTest(generateBytes(), DataTypes.TimestampType)
castTest(
generateBytes(),
DataTypes.TimestampType,
hasIncompatibleType = usingDataFusionParquetReader)
}

// CAST from ShortType

test("cast ShortType to BooleanType") {
castTest(generateShorts(), DataTypes.BooleanType)
castTest(
generateShorts(),
DataTypes.BooleanType,
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ShortType to ByteType") {
// https://github.com/apache/datafusion-comet/issues/311
castTest(generateShorts(), DataTypes.ByteType)
castTest(
generateShorts(),
DataTypes.ByteType,
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ShortType to IntegerType") {
castTest(generateShorts(), DataTypes.IntegerType)
castTest(
generateShorts(),
DataTypes.IntegerType,
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ShortType to LongType") {
castTest(generateShorts(), DataTypes.LongType)
castTest(
generateShorts(),
DataTypes.LongType,
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ShortType to FloatType") {
castTest(generateShorts(), DataTypes.FloatType)
castTest(
generateShorts(),
DataTypes.FloatType,
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ShortType to DoubleType") {
castTest(generateShorts(), DataTypes.DoubleType)
castTest(
generateShorts(),
DataTypes.DoubleType,
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ShortType to DecimalType(10,2)") {
castTest(generateShorts(), DataTypes.createDecimalType(10, 2))
castTest(
generateShorts(),
DataTypes.createDecimalType(10, 2),
hasIncompatibleType = usingDataFusionParquetReader)
}

test("cast ShortType to StringType") {
castTest(generateShorts(), DataTypes.StringType)
castTest(
generateShorts(),
DataTypes.StringType,
hasIncompatibleType = usingDataFusionParquetReader)
}

ignore("cast ShortType to BinaryType") {
castTest(generateShorts(), DataTypes.BinaryType)
castTest(
generateShorts(),
DataTypes.BinaryType,
hasIncompatibleType = usingDataFusionParquetReader)
}

ignore("cast ShortType to TimestampType") {
// input: -1003, expected: 1969-12-31 15:43:17.0, actual: 1969-12-31 15:59:59.998997
castTest(generateShorts(), DataTypes.TimestampType)
castTest(
generateShorts(),
DataTypes.TimestampType,
hasIncompatibleType = usingDataFusionParquetReader)
}

// CAST from integer
Expand Down Expand Up @@ -1083,7 +1146,11 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

private def castTest(input: DataFrame, toType: DataType, testAnsi: Boolean = true): Unit = {
private def castTest(
input: DataFrame,
toType: DataType,
hasIncompatibleType: Boolean = false,
testAnsi: Boolean = true): Unit = {

// we now support the TryCast expression in Spark 3.3
withTempPath { dir =>
Expand All @@ -1093,12 +1160,20 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withSQLConf((SQLConf.ANSI_ENABLED.key, "false")) {
// cast() should return null for invalid inputs when ansi mode is disabled
val df = spark.sql(s"select a, cast(a as ${toType.sql}) from t order by a")
checkSparkAnswerAndOperator(df)
if (hasIncompatibleType) {
checkSparkAnswer(df)
} else {
checkSparkAnswerAndOperator(df)
}

// try_cast() should always return null for invalid inputs
val df2 =
spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by a")
checkSparkAnswerAndOperator(df2)
if (hasIncompatibleType) {
checkSparkAnswer(df2)
} else {
checkSparkAnswerAndOperator(df2)
}
}

if (testAnsi) {
Expand Down Expand Up @@ -1154,7 +1229,11 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// try_cast() should always return null for invalid inputs
val df2 =
spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by a")
checkSparkAnswerAndOperator(df2)
if (hasIncompatibleType) {
checkSparkAnswer(df2)
} else {
checkSparkAnswerAndOperator(df2)
}

}
}
Expand Down
40 changes: 40 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,45 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("uint data type support") {
Seq(true, false).foreach { dictionaryEnabled =>
// TODO: Once the question of what to get back from uint_8, uint_16 types is resolved,
// we can also update this test to check for COMET_SCAN_ALLOW_INCOMPATIBLE=true
Seq(false).foreach { allowIncompatible =>
{
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> allowIncompatible.toString) {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "testuint.parquet")
makeParquetFileAllTypes(
path,
dictionaryEnabled = dictionaryEnabled,
Byte.MinValue,
Byte.MaxValue)
withParquetTable(path.toString, "tbl") {
val qry = "select _9 from tbl order by _11"
if (CometSparkSessionExtensions.isComplexTypeReaderEnabled(conf)) {
if (!allowIncompatible) {
checkSparkAnswer(qry)
} else {
// need to convert the values to unsigned values
val expected = (Byte.MinValue to Byte.MaxValue)
.map(v => {
if (v < 0) Byte.MaxValue.toShort - v else v
})
.toDF("a")
checkAnswer(sql(qry), expected)
}
} else {
checkSparkAnswerAndOperator(qry)
}
}
}
}
}
}
}
}

test("null literals") {
val batchSize = 1000
Seq(true, false).foreach { dictionaryEnabled =>
Expand All @@ -142,6 +181,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
checkSparkAnswerAndOperator(sqlString)
}
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.unsafe.types.UTF8String

import com.google.common.primitives.UnsignedLong

import org.apache.comet.CometConf
import org.apache.comet.{CometConf, CometSparkSessionExtensions}
import org.apache.comet.CometSparkSessionExtensions.{isSpark34Plus, isSpark40Plus}

abstract class ParquetReadSuite extends CometTestBase {
Expand Down Expand Up @@ -139,7 +139,10 @@ abstract class ParquetReadSuite extends CometTestBase {
i.toDouble,
DateTimeUtils.toJavaDate(i))
}
checkParquetScan(data)
if (!CometSparkSessionExtensions.isComplexTypeReaderEnabled(
conf) || CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get()) {
checkParquetScan(data)
}
checkParquetFile(data)
}
}
Expand All @@ -159,7 +162,10 @@ abstract class ParquetReadSuite extends CometTestBase {
i.toDouble,
DateTimeUtils.toJavaDate(i))
}
checkParquetScan(data)
if (!CometSparkSessionExtensions.isComplexTypeReaderEnabled(
conf) || CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get()) {
checkParquetScan(data)
}
checkParquetFile(data)
}
}
Expand All @@ -178,7 +184,10 @@ abstract class ParquetReadSuite extends CometTestBase {
DateTimeUtils.toJavaDate(i))
}
val filter = (row: Row) => row.getBoolean(0)
checkParquetScan(data, filter)
if (!CometSparkSessionExtensions.isComplexTypeReaderEnabled(
conf) || CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get()) {
checkParquetScan(data, filter)
}
checkParquetFile(data, filter)
}

Expand Down
Loading

0 comments on commit 57a4dca

Please sign in to comment.